00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "orbsvcs/AV/AVStreams_i.h"
00014 #include "orbsvcs/AV/sfp.h"
00015 #include "orbsvcs/AV/MCast.h"
00016 #include "orbsvcs/AV/RTCP.h"
00017
00018 #include "tao/debug.h"
00019 #include "tao/ORB_Core.h"
00020 #include "tao/AnyTypeCode/Any.h"
00021 #include "ace/OS_NS_arpa_inet.h"
00022
00023 #if !defined (__ACE_INLINE__)
00024 #include "orbsvcs/AV/AVStreams_i.inl"
00025 #endif
00026
00027 ACE_RCSID (AV,
00028 AVStreams_i,
00029 "$Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00030
00031 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00032
00033
00034
00035
00036
00037 TAO_AV_QoS::TAO_AV_QoS (void)
00038 {
00039 }
00040
00041 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00042 {
00043 this->set (stream_qos);
00044 }
00045
00046 int
00047 TAO_AV_QoS::convert (AVStreams::streamQoS &)
00048 {
00049 return -1;
00050 }
00051
00052
00053
00054
00055
00056 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00057 {
00058 }
00059
00060 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00061 {
00062 }
00063
00064
00065
00066
00067
00068
00069
00070 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00071 :flow_count_ (0)
00072 {
00073 }
00074
00075
00076
00077
00078 void
00079 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00080 {
00081 try
00082 {
00083
00084
00085 if (this->flow_connection_map_.current_size () > 0)
00086 {
00087 if (flow_spec.length () > 0)
00088 for (u_int i=0;i<flow_spec.length ();i++)
00089 {
00090 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00091 ACE_CString flow_name_key (flowname);
00092 AVStreams::FlowConnection_var flow_connection_entry;
00093 if (this->flow_connection_map_.find (flow_name_key,
00094 flow_connection_entry) == 0)
00095 {
00096 flow_connection_entry->stop ();
00097 }
00098 }
00099 else
00100 {
00101
00102 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00103 FlowConnection_Map_Entry *entry;
00104 for (;iterator.next (entry) != 0;iterator.advance ())
00105 {
00106 entry->int_id_->stop ();
00107 }
00108 }
00109 }
00110 }
00111 catch (const CORBA::Exception& ex)
00112 {
00113 ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00114 return;
00115 }
00116 }
00117
00118
00119
00120 void
00121 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00122 {
00123 try
00124 {
00125
00126
00127
00128 if (this->flow_connection_map_.current_size () > 0)
00129 {
00130 if (flow_spec.length () > 0)
00131 for (u_int i = 0; i < flow_spec.length (); i++)
00132 {
00133 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00134 ACE_CString flow_name_key (flowname);
00135 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00136 if (this->flow_connection_map_.find (flow_name_key,
00137 flow_connection_entry) == 0)
00138 {
00139 flow_connection_entry->int_id_->start ();
00140 }
00141 }
00142 else
00143 {
00144
00145 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00146 FlowConnection_Map_Entry *entry = 0;
00147 for (;iterator.next (entry) != 0;iterator.advance ())
00148 {
00149 entry->int_id_->start ();
00150 }
00151 }
00152 }
00153 }
00154 catch (const CORBA::Exception& ex)
00155 {
00156 ex._tao_print_exception ("TAO_Basic_StreamCtrl::start");
00157 return;
00158 }
00159 }
00160
00161
00162
00163
00164
00165 void
00166 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00167 {
00168 try
00169 {
00170
00171 if (this->flow_connection_map_.current_size () > 0)
00172 {
00173 if (flow_spec.length () > 0)
00174 {
00175 for (u_int i=0;i<flow_spec.length ();i++)
00176 {
00177 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00178 ACE_CString flow_name_key (flowname);
00179 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00180 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00181 {
00182 flow_connection_entry->int_id_->destroy ();
00183 }
00184 }
00185 }
00186 else
00187 {
00188
00189 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00190 FlowConnection_Map_Entry *entry = 0;
00191 for (;iterator.next (entry) != 0;iterator.advance ())
00192 {
00193 entry->int_id_->destroy ();
00194 }
00195 }
00196 }
00197 }
00198 catch (const CORBA::Exception& ex)
00199 {
00200 ex._tao_print_exception ("TAO_Basic_StreamCtrl::destroy");
00201 return;
00202 }
00203 }
00204
00205
00206
00207 CORBA::Boolean
00208
00209 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & ,
00210 const AVStreams::flowSpec &)
00211 {
00212 return 1;
00213 }
00214
00215
00216
00217 void
00218 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &)
00219 {
00220 if (TAO_debug_level > 0)
00221 ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00222 }
00223
00224
00225 void
00226 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00227 const char *fp_name,
00228 const CORBA::Any &fp_settings)
00229
00230 {
00231 if (!CORBA::is_nil (this->sep_a_.in ()))
00232 {
00233 this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings);
00234 }
00235 }
00236
00237
00238 CORBA::Object_ptr
00239 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name)
00240 {
00241 ACE_CString flow_name_key (flow_name);
00242 AVStreams::FlowConnection_var flow_connection_entry;
00243
00244 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00245 return flow_connection_entry._retn();
00246 }
00247 else{
00248 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00249 throw AVStreams::noSuchFlow ();
00250 }
00251 }
00252
00253
00254 void
00255 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00256 CORBA::Object_ptr flow_connection_obj)
00257 {
00258 AVStreams::FlowConnection_var flow_connection;
00259 try
00260 {
00261 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj);
00262 }
00263 catch (const CORBA::Exception& ex)
00264 {
00265 ex._tao_print_exception (
00266 "TAO_Basic_StreamCtrl::set_flow_connection");
00267 return;
00268 }
00269
00270 this->flows_.length (this->flow_count_ + 1);
00271 this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00272 ACE_CString flow_name_key (flow_name);
00273 if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00274 {
00275 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00276 throw AVStreams::noSuchFlow ();
00277 }
00278 }
00279
00280 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00281 {
00282 }
00283
00284
00285
00286
00287
00288 CORBA::Boolean
00289 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr ,
00290 const AVStreams::streamQoS &)
00291 {
00292 ACE_DEBUG ((LM_DEBUG,
00293 "TAO_Negotiator::negotiate\n"));
00294 return 0;
00295 }
00296
00297
00298
00299
00300
00301 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00302
00303
00304 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00305 {
00306 this->mmdevice_ = AVStreams::MMDevice::_nil ();
00307 }
00308
00309
00310 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00311 {
00312 this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00313 }
00314
00315
00316 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00317 {
00318 this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00319 }
00320
00321
00322 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00323 {
00324 CORBA::release (this->mmdevice_);
00325 }
00326
00327 bool
00328 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00329 {
00330 CORBA::Boolean result = 0;
00331 try
00332 {
00333 result =
00334 this->mmdevice_->_is_equivalent (hash_key.mmdevice_);
00335 }
00336 catch (const CORBA::Exception& ex)
00337 {
00338 ex._tao_print_exception (
00339 "MMDevice_Map_Hash_Key::operator == ");
00340 return false;
00341 }
00342
00343 return result;
00344 }
00345
00346 bool
00347 operator < (const MMDevice_Map_Hash_Key &left,
00348 const MMDevice_Map_Hash_Key &right)
00349 {
00350 bool result = false;
00351
00352 try
00353 {
00354 const CORBA::ULong left_hash =
00355 left.mmdevice_->_hash (left.hash_maximum_);
00356
00357 const CORBA::ULong right_hash =
00358 right.mmdevice_->_hash (right.hash_maximum_);
00359
00360 result = left_hash < right_hash;
00361 }
00362 catch (const CORBA::Exception& ex)
00363 {
00364 ex._tao_print_exception ("operator < for MMDevice_Map_Hash_Key");
00365 return false;
00366 }
00367
00368 return result;
00369 }
00370
00371 u_long
00372 MMDevice_Map_Hash_Key::hash (void) const
00373 {
00374 u_long result = 0;
00375 try
00376 {
00377 result = this->mmdevice_->_hash (this->hash_maximum_);
00378 }
00379 catch (const CORBA::Exception& ex)
00380 {
00381 ex._tao_print_exception ("MMDevice_Map_Hash_Key::hash");
00382 return 0;
00383 }
00384 return result;
00385 }
00386
00387
00388
00389
00390
00391 TAO_StreamCtrl::TAO_StreamCtrl (void)
00392 :mcastconfigif_ (0)
00393 {
00394 try
00395 {
00396 this->streamctrl_ = this->_this ();
00397 char buf [BUFSIZ];
00398 int result = ACE_OS::hostname (buf, BUFSIZ);
00399 unsigned long ipaddr = 0;
00400 if (result == 0)
00401 ipaddr = ACE_OS::inet_addr (buf);
00402 this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00403 }
00404 catch (const CORBA::Exception& ex)
00405 {
00406 ex._tao_print_exception ("TAO_StreamCtrl::TAO_StreamCtrl");
00407 }
00408 }
00409
00410 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00411 {
00412 delete this->mcastconfigif_;
00413 }
00414
00415
00416
00417
00418 void
00419 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00420 {
00421 try
00422 {
00423 TAO_Basic_StreamCtrl::stop (flow_spec);
00424 if (this->flow_connection_map_.current_size () > 0)
00425 return;
00426 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00427 MMDevice_Map::ENTRY *entry = 0;
00428 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00429 {
00430 entry->int_id_.sep_->stop (flow_spec);
00431 }
00432 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00433 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00434 {
00435 entry->int_id_.sep_->stop (flow_spec);
00436 }
00437 }
00438 catch (const CORBA::Exception& ex)
00439 {
00440 ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00441 return;
00442 }
00443 }
00444
00445
00446
00447 void
00448 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00449 {
00450 try
00451 {
00452 TAO_Basic_StreamCtrl::start (flow_spec);
00453 if (this->flow_connection_map_.current_size () > 0)
00454 return;
00455
00456 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00457 MMDevice_Map::ENTRY *entry = 0;
00458 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00459 {
00460 entry->int_id_.sep_->start (flow_spec);
00461 }
00462 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00463 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00464 {
00465 entry->int_id_.sep_->start (flow_spec);
00466 }
00467 }
00468 catch (const CORBA::Exception& ex)
00469 {
00470 ex._tao_print_exception ("TAO_StreamCtrl::start");
00471 return;
00472 }
00473 }
00474
00475
00476
00477
00478 void
00479 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00480 {
00481 try
00482 {
00483 TAO_Basic_StreamCtrl::destroy (flow_spec);
00484 if (this->flow_connection_map_.current_size () > 0)
00485 return;
00486
00487 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00488 MMDevice_Map::ENTRY *entry = 0;
00489 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00490 {
00491 entry->int_id_.sep_->destroy (flow_spec);
00492 }
00493 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00494 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00495 {
00496 entry->int_id_.sep_->destroy (flow_spec);
00497 }
00498 }
00499 catch (const CORBA::Exception& ex)
00500 {
00501 ex._tao_print_exception ("TAO_StreamCtrl::destroy");
00502 return;
00503 }
00504
00505 int result = TAO_AV_Core::deactivate_servant (this);
00506 if (result < 0)
00507 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00508 }
00509
00510
00511
00512
00513
00514 CORBA::Boolean
00515 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00516 AVStreams::MMDevice_ptr b_party,
00517 AVStreams::streamQoS &the_qos,
00518 const AVStreams::flowSpec &the_flows)
00519 {
00520 try
00521 {
00522 if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00523 ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00524
00525 if (TAO_debug_level > 0)
00526 if (CORBA::is_nil (a_party) ||
00527 CORBA::is_nil (b_party))
00528 if (TAO_debug_level > 0)
00529 ACE_DEBUG ((LM_DEBUG,
00530 "(%P|%t) TAO_StreamCtrl::bind_devs: "
00531 "a_party or b_party is null"
00532 "Multicast mode\n"));
00533
00534
00535 CORBA::Boolean met_qos;
00536 CORBA::String_var named_vdev;
00537
00538 if (!CORBA::is_nil (a_party))
00539 {
00540 MMDevice_Map_Hash_Key find_key (a_party);
00541 MMDevice_Map_Entry find_entry;
00542 int result =
00543 this->mmdevice_a_map_.find (find_key, find_entry);
00544 if (result == 0)
00545 {
00546 if (TAO_debug_level > 0)
00547 {
00548
00549 if (TAO_debug_level > 0)
00550 ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00551 }
00552 return 1;
00553 }
00554 else
00555 {
00556 this->sep_a_ =
00557 a_party-> create_A (this->streamctrl_.in (),
00558 this->vdev_a_.out (),
00559 the_qos,
00560 met_qos,
00561 named_vdev.inout (),
00562 the_flows);
00563
00564 if (TAO_debug_level > 0)
00565 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00566
00567 CORBA::Any streamctrl_any;
00568 streamctrl_any <<= this->streamctrl_.in ();
00569 this->sep_a_->define_property ("Related_StreamCtrl",
00570 streamctrl_any);
00571
00572 CORBA::Any vdev_a_any;
00573 vdev_a_any <<= this->vdev_a_.in ();
00574 this->sep_a_->define_property ("Related_VDev",
00575 vdev_a_any);
00576
00577 CORBA::Any streamendpoint_a_any;
00578 streamendpoint_a_any <<= this->sep_a_.in ();
00579 this->vdev_a_->define_property ("Related_StreamEndpoint",
00580 streamendpoint_a_any);
00581
00582
00583 CORBA::Any mmdevice_a_any;
00584 mmdevice_a_any <<= a_party;
00585 this->vdev_a_->define_property ("Related_MMDevice",
00586 mmdevice_a_any);
00587
00588
00589 MMDevice_Map_Entry map_entry;
00590 MMDevice_Map_Hash_Key key (a_party);
00591 map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00592 map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00593 map_entry.flowspec_ = the_flows;
00594 map_entry.qos_ = the_qos;
00595 result =
00596 this->mmdevice_a_map_.bind (key, map_entry);
00597 if (result < 0)
00598 if (TAO_debug_level > 0)
00599 ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00600 }
00601 }
00602
00603 if (!CORBA::is_nil (b_party))
00604 {
00605 MMDevice_Map_Hash_Key find_key (b_party);
00606 MMDevice_Map_Entry find_entry;
00607 int result =
00608 this->mmdevice_b_map_.find (find_key, find_entry);
00609 if (result == 0)
00610 {
00611
00612 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00613 return 1;
00614 }
00615 else
00616 {
00617 this->sep_b_ =
00618 b_party-> create_B (this->streamctrl_.in (),
00619 this->vdev_b_.out (),
00620 the_qos,
00621 met_qos,
00622 named_vdev.inout (),
00623 the_flows);
00624
00625 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00626
00627 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00628 "\n(%P|%t)stream_endpoint_b_ = %s",
00629 TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ())));
00630
00631 CORBA::Any streamctrl_any;
00632 streamctrl_any <<= this->streamctrl_.in ();
00633 this->sep_b_->define_property ("Related_StreamCtrl",
00634 streamctrl_any);
00635
00636 CORBA::Any vdev_b_any;
00637 vdev_b_any <<= this->vdev_b_.in ();
00638 this->sep_b_->define_property ("Related_VDev",
00639 vdev_b_any);
00640
00641 CORBA::Any streamendpoint_b_any;
00642 streamendpoint_b_any <<= this->sep_b_.in ();
00643 this->vdev_b_->define_property ("Related_StreamEndpoint",
00644 streamendpoint_b_any);
00645
00646
00647 CORBA::Any mmdevice_b_any;
00648 mmdevice_b_any <<= b_party;
00649 this->vdev_b_->define_property ("Related_MMDevice",
00650 mmdevice_b_any);
00651
00652 MMDevice_Map_Entry map_entry;
00653 MMDevice_Map_Hash_Key key (b_party);
00654 map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00655 map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00656 map_entry.flowspec_ = the_flows;
00657 map_entry.qos_ = the_qos;
00658 int result =
00659 this->mmdevice_b_map_.bind (key, map_entry);
00660 if (result < 0)
00661 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00662 }
00663 }
00664
00665
00666 if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00667 {
00668 CORBA::Any sep_a_peer_any;
00669 CORBA::Any sep_b_peer_any;
00670
00671 sep_a_peer_any <<= this->sep_b_.in();
00672 sep_b_peer_any <<= this->sep_a_.in();
00673 this->sep_a_->define_property ("PeerAdapter",
00674 sep_a_peer_any);
00675
00676 this->sep_b_->define_property ("PeerAdapter",
00677 sep_b_peer_any);
00678 }
00679
00680
00681 if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00682 {
00683
00684
00685
00686 try
00687 {
00688 CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows");
00689 AVStreams::flowSpec_var flows;
00690 *flows_any >>= flows.out ();
00691 for (CORBA::ULong i=0; i< flows->length ();++i)
00692 {
00693 CORBA::Object_var fep_obj =
00694 this->sep_a_->get_fep (flows [i]);
00695 try
00696 {
00697 AVStreams::FlowProducer_var producer =
00698 AVStreams::FlowProducer::_narrow (fep_obj.in ());
00699 producer->set_source_id (this->source_id_++);
00700 }
00701 catch (const CORBA::Exception& ex)
00702 {
00703 if (TAO_debug_level > 0)
00704 ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00705
00706 ex._tao_print_exception (
00707 "producer_check: not a producer");
00708
00709 }
00710 }
00711 }
00712 catch (const CORBA::Exception&)
00713 {
00714
00715
00716
00717
00718
00719
00720 this->sep_a_->set_source_id (this->source_id_++);
00721 }
00722 if (!this->mcastconfigif_)
00723 {
00724 ACE_NEW_RETURN (this->mcastconfigif_,
00725 TAO_MCastConfigIf,
00726 0);
00727
00728 this->mcastconfigif_ptr_ = this->mcastconfigif_->_this ();
00729 }
00730
00731 CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00732 this->mcastconfigif_ptr_.in (),
00733 the_qos,
00734 the_flows);
00735 if (!result)
00736 ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00737 }
00738
00739 if (CORBA::is_nil (a_party))
00740 {
00741 if (!CORBA::is_nil (this->vdev_b_.in ()))
00742 {
00743
00744 if (!this->mcastconfigif_)
00745 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00746 this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00747 the_qos,
00748 the_flows);
00749 }
00750
00751 int connect_leaf_success = 0;
00752 try
00753 {
00754
00755
00756 connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00757 the_qos,
00758 the_flows);
00759 connect_leaf_success = 1;
00760 }
00761 catch (const AVStreams::notSupported&)
00762 {
00763 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00764 connect_leaf_success = 0;
00765 }
00766 catch (const CORBA::Exception& ex)
00767 {
00768 ex._tao_print_exception (
00769 "TAO_StreamCtrl::bind_devs");
00770 }
00771 if (!connect_leaf_success)
00772 {
00773 if (TAO_debug_level > 0)
00774 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00775 AVStreams::flowSpec connect_flows = the_flows;
00776 this->sep_a_->multiconnect (the_qos, connect_flows);
00777 this->sep_b_->multiconnect (the_qos, connect_flows);
00778 }
00779 }
00780
00781 if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00782 {
00783
00784
00785
00786
00787 if( a_party->is_property_defined("Flows") &&
00788 b_party->is_property_defined("Flows") )
00789 {
00790 if (TAO_debug_level > 0) {
00791
00792 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00793
00794 }
00795
00796
00797
00798
00799 this->bind (this->sep_a_.in (),
00800 this->sep_b_.in (),
00801 the_qos,
00802 the_flows);
00803
00804
00805
00806 }
00807
00808 else if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00809 {
00810 if (TAO_debug_level > 0) {
00811
00812 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00813
00814 }
00815
00816
00817 this->vdev_a_->set_peer (this->streamctrl_.in (),
00818 this->vdev_b_.in (),
00819 the_qos,
00820 the_flows);
00821
00822 this->vdev_b_->set_peer (this->streamctrl_.in (),
00823 this->vdev_a_.in (),
00824 the_qos,
00825 the_flows);
00826
00827
00828
00829
00830 CORBA::Boolean result =
00831 this->sep_a_->connect (this->sep_b_.in (),
00832 the_qos,
00833 the_flows);
00834 if (result == 0)
00835 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00836 }
00837 }
00838 }
00839 catch (const CORBA::Exception& ex)
00840 {
00841 ex._tao_print_exception ("TAO_StreamCtrl::bind_devs");
00842 return 0;
00843 }
00844 return 1;
00845 }
00846
00847
00848
00849 CORBA::Boolean
00850 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
00851 AVStreams::StreamEndPoint_B_ptr sep_b,
00852 AVStreams::streamQoS &stream_qos,
00853 const AVStreams::flowSpec &flow_spec)
00854 {
00855 this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
00856 this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
00857
00858 int result = 0;
00859 try
00860 {
00861 if (CORBA::is_nil (sep_a_.in() ) ||
00862 CORBA::is_nil (sep_b_.in() ))
00863 ACE_ERROR_RETURN ((LM_ERROR,
00864 "(%P|%t) TAO_StreamCtrl::bind:"
00865 "a_party or b_party null!"),
00866 0);
00867
00868
00869 CORBA::Any sep_any;
00870 sep_any <<= sep_b;
00871 sep_a_->define_property ("PeerAdapter",
00872 sep_any);
00873 sep_any <<= sep_a;
00874 sep_b_->define_property ("PeerAdapter",
00875 sep_any);
00876
00877
00878
00879 AVStreams::flowSpec a_flows, b_flows;
00880 CORBA::Any_var flows_any;
00881 flows_any = sep_a_->get_property_value ("Flows");
00882 AVStreams::flowSpec *temp_flows;
00883 flows_any.in () >>= temp_flows;
00884 a_flows = *temp_flows;
00885 flows_any = sep_b_->get_property_value ("Flows");
00886 flows_any.in () >>= temp_flows;
00887 b_flows = *temp_flows;
00888 u_int i;
00889 FlowEndPoint_Map *a_fep_map;
00890 FlowEndPoint_Map *b_fep_map;
00891 ACE_NEW_RETURN (a_fep_map,
00892 FlowEndPoint_Map,
00893 0);
00894 ACE_NEW_RETURN (b_fep_map,
00895 FlowEndPoint_Map,
00896 0);
00897 for (i=0;i<a_flows.length ();i++)
00898 {
00899 const char *flowname = a_flows[i];
00900
00901 CORBA::Object_var fep_obj =
00902 sep_a_->get_fep (flowname);
00903 AVStreams::FlowEndPoint_var fep =
00904 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00905 ACE_CString fep_key (flowname);
00906 result = a_fep_map->bind (fep_key, fep);
00907 if (result == -1)
00908 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00909 }
00910
00911 for (i=0;i<b_flows.length ();i++)
00912 {
00913 const char *flowname = b_flows[i];
00914
00915 CORBA::Object_var fep_obj =
00916 sep_b->get_fep (flowname);
00917 AVStreams::FlowEndPoint_var fep =
00918 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00919 ACE_CString fep_key (flowname);
00920 result = b_fep_map->bind (fep_key, fep);
00921 if (result == -1)
00922 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00923 }
00924 FlowEndPoint_Map *map_a = 0, *map_b = 0;
00925 if (flow_spec.length () == 0)
00926 {
00927 map_a = a_fep_map;
00928 map_b = b_fep_map;
00929 }
00930 else
00931 {
00932 FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
00933 ACE_NEW_RETURN (spec_fep_map_a,
00934 FlowEndPoint_Map,
00935 0);
00936 ACE_NEW_RETURN (spec_fep_map_b,
00937 FlowEndPoint_Map,
00938 0);
00939 for (i=0; i< flow_spec.length ();i++)
00940 {
00941 TAO_Forward_FlowSpec_Entry *entry = 0;
00942 ACE_NEW_RETURN (entry,
00943 TAO_Forward_FlowSpec_Entry,
00944 0);
00945 entry->parse (flow_spec[i]);
00946 ACE_CString fep_key (entry->flowname ());
00947 AVStreams::FlowEndPoint_var fep;
00948 result = a_fep_map->find (fep_key, fep);
00949 if (result == -1)
00950 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i].in ()), 0);
00951
00952 result = spec_fep_map_a->bind (fep_key, fep);
00953 if (result == -1)
00954 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00955
00956 result = b_fep_map->find (fep_key, fep);
00957 if (result == -1)
00958 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i].in ()), 0);
00959
00960 result = spec_fep_map_b->bind (fep_key, fep);
00961 if (result == -1)
00962 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00963 }
00964 map_a = spec_fep_map_a;
00965 map_b = spec_fep_map_b;
00966 }
00967
00968 TAO_AV_QoS qos (stream_qos);
00969
00970
00971 FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
00972 FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
00973 try
00974 {
00975
00976 for (;a_feps_iterator.next (a_feps_entry) != 0;
00977 a_feps_iterator.advance ())
00978 {
00979 AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
00980 AVStreams::FlowEndPoint_var connected_to =
00981 fep_a->get_connected_fep ();
00982
00983 if (!CORBA::is_nil (connected_to.in ()))
00984 {
00985
00986 continue;
00987 }
00988
00989 FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
00990 for (;b_feps_iterator.next (b_feps_entry) != 0;
00991 b_feps_iterator.advance ())
00992 {
00993 AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
00994 AVStreams::FlowConnection_var flow_connection;
00995
00996 AVStreams::FlowEndPoint_var connected_to =
00997 fep_b->get_connected_fep ();
00998
00999 if (!CORBA::is_nil (connected_to.in ()))
01000 {
01001
01002 continue;
01003 }
01004
01005 if (fep_a->is_fep_compatible (fep_b.in()) == 1)
01006 {
01007
01008
01009 CORBA::Object_var flow_connection_obj;
01010 CORBA::Any_var flowname_any =
01011 fep_a->get_property_value ("FlowName");
01012 const char *flowname = 0;
01013 flowname_any.in () >>= flowname;
01014 try
01015 {
01016 flow_connection_obj =
01017 this->get_flow_connection (flowname);
01018 flow_connection =
01019 AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
01020 }
01021 catch (const CORBA::Exception&)
01022 {
01023 TAO_FlowConnection *flowConnection;
01024 ACE_NEW_RETURN (flowConnection,
01025 TAO_FlowConnection,
01026 0);
01027 flow_connection = flowConnection->_this ();
01028 this->set_flow_connection (flowname,
01029 flow_connection.in ());
01030 }
01031
01032
01033
01034
01035
01036
01037
01038 AVStreams::FlowProducer_var producer;
01039 AVStreams::FlowConsumer_var consumer;
01040
01041 try
01042 {
01043 producer =
01044 AVStreams::FlowProducer::_narrow (fep_a.in());
01045 consumer =
01046 AVStreams::FlowConsumer::_narrow (fep_b.in());
01047
01048
01049
01050 if (CORBA::is_nil (producer.in ()))
01051 {
01052 producer =
01053 AVStreams::FlowProducer::_narrow (fep_b.in());
01054 consumer =
01055 AVStreams::FlowConsumer::_narrow (fep_a.in());
01056 }
01057
01058
01059
01060
01061 ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01062 ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01063 }
01064 catch (const CORBA::Exception&)
01065 {
01066
01067 throw;
01068 }
01069 CORBA::String_var fep_a_name, fep_b_name;
01070 flowname_any = fep_a->get_property_value ("FlowName");
01071 const char *temp_name;
01072 flowname_any.in () >>= temp_name;
01073 fep_a_name = CORBA::string_dup (temp_name);
01074 flowname_any = fep_b->get_property_value ("FlowName");
01075 flowname_any.in () >>= temp_name;
01076 fep_b_name = CORBA::string_dup (temp_name);
01077 AVStreams::QoS flow_qos;
01078 flow_qos.QoSType = fep_a_name;
01079 flow_qos.QoSParams.length (0);
01080 result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01081 if (result == -1)
01082 {
01083 flow_qos.QoSType = fep_b_name;
01084 result = qos.get_flow_qos (fep_b_name.in (),
01085 flow_qos);
01086 if (result == -1 && TAO_debug_level > 0)
01087 ACE_DEBUG ((LM_DEBUG,
01088 "No QoS Specified for this flow <%s>\n", flowname));
01089 }
01090 flow_connection->connect (producer.in (),
01091 consumer.in (),
01092 flow_qos);
01093 }
01094 }
01095 }
01096 }
01097 catch (const CORBA::Exception& ex)
01098 {
01099 ex._tao_print_exception ("TAO_StreamCtrl::bind:flow_connect block");
01100 return 0;
01101 }
01102 }
01103 catch (const CORBA::Exception&)
01104 {
01105
01106
01107 this->sep_a_->connect (this->sep_b_.in (),
01108 stream_qos,
01109 flow_spec);
01110 }
01111 return 1;
01112 }
01113
01114 void
01115 TAO_StreamCtrl::unbind (void)
01116 {
01117 try
01118 {
01119 if (this->flow_connection_map_.current_size () > 0)
01120 return;
01121
01122 AVStreams::flowSpec flow_spec;
01123 flow_spec.length(0);
01124
01125 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01126 MMDevice_Map::ENTRY *entry = 0;
01127 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01128 {
01129 entry->int_id_.sep_->destroy (flow_spec);
01130 }
01131 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01132 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01133 {
01134 entry->int_id_.sep_->destroy (flow_spec);
01135 }
01136 }
01137 catch (const CORBA::Exception& ex)
01138 {
01139 ex._tao_print_exception ("TAO_StreamCtrl::unbind");
01140 return;
01141 }
01142 }
01143
01144 void
01145 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr ,
01146 const AVStreams::flowSpec &)
01147 {
01148 }
01149
01150 void
01151 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr ,
01152 const AVStreams::flowSpec & )
01153 {
01154 }
01155
01156 AVStreams::VDev_ptr
01157 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01158 AVStreams::StreamEndPoint_out sep)
01159 {
01160 MMDevice_Map_Hash_Key key (adev);
01161 MMDevice_Map_Entry entry;
01162 int result = -1;
01163 result = this->mmdevice_a_map_.find (key, entry);
01164 if (result < 0)
01165 {
01166 result = this->mmdevice_a_map_.find (key, entry);
01167 if (result < 0)
01168 return AVStreams::VDev::_nil ();
01169 }
01170 sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01171 return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01172 }
01173
01174 CORBA::Boolean
01175
01176 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01177 const AVStreams::flowSpec &the_spec)
01178 {
01179 if (TAO_debug_level > 0)
01180 ACE_DEBUG ((LM_DEBUG,
01181 "TAO_StreamCtrl::modify_QoS\n"));
01182
01183
01184 if (this->mcastconfigif_ != 0)
01185 {
01186
01187 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01188 }
01189 else
01190 {
01191 try
01192 {
01193 AVStreams::flowSpec in_flowspec;
01194 AVStreams::flowSpec out_flowspec;
01195
01196 in_flowspec.length (0);
01197 out_flowspec.length (0);
01198
01199 int in_index = 0;
01200 int out_index = 0;
01201
01202 AVStreams::flowSpec flowspec;
01203 if (the_spec.length () == 0)
01204 {
01205
01206 flowspec = this->flows_;
01207 MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01208 MMDevice_Map::ENTRY *entry = 0;
01209 for (;iterator.next (entry) != 0;iterator.advance ())
01210 {
01211 flowspec = entry->int_id_.flowspec_;
01212 }
01213 }
01214 else
01215 {
01216 flowspec = the_spec;
01217 }
01218
01219 if (TAO_debug_level > 0)
01220 ACE_DEBUG ((LM_DEBUG,
01221 "TAO_StreamCtrl::modify_QoS\n"));
01222
01223
01224 for (u_int i=0;i < flowspec.length ();i++)
01225 {
01226 TAO_Forward_FlowSpec_Entry entry;
01227 entry.parse (flowspec [i]);
01228 int direction = entry.direction ();
01229 if (direction == 0)
01230 {
01231 in_flowspec.length (in_index + 1);
01232 in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01233 }
01234 else
01235 {
01236 out_flowspec.length (out_index + 1);
01237 out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01238 }
01239 }
01240
01241 if (in_flowspec.length () != 0)
01242 {
01243 this->vdev_a_->modify_QoS (new_qos, in_flowspec);
01244 }
01245
01246 if (out_flowspec.length () != 0)
01247 {
01248 this->vdev_b_->modify_QoS (new_qos, out_flowspec);
01249 }
01250 }
01251 catch (const CORBA::Exception& ex)
01252 {
01253 ex._tao_print_exception ("TAO_StreamCtrl::modify_QoS");
01254 return 0;
01255 }
01256
01257 }
01258 return 1;
01259 }
01260
01261
01262
01263
01264
01265 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01266 :peer_list_iterator_ (peer_list_)
01267 {
01268 }
01269
01270 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01271 {
01272
01273 }
01274
01275
01276 CORBA::Boolean
01277 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01278 AVStreams::streamQoS & qos,
01279 const AVStreams::flowSpec & flow_spec)
01280 {
01281 try
01282 {
01283 Peer_Info *info;
01284 ACE_NEW_RETURN (info,
01285 Peer_Info,
01286 0);
01287 info->peer_ = AVStreams::VDev::_narrow (peer);
01288 info->qos_ = qos;
01289 info->flow_spec_ = flow_spec;
01290 this->peer_list_.insert_tail (info);
01291 }
01292 catch (const CORBA::Exception& ex)
01293 {
01294 ex._tao_print_exception ("TAO_MCastConfigIf::set_peer");
01295 return 0;
01296 }
01297 return 1;
01298 }
01299
01300
01301 void
01302 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration)
01303 {
01304 Peer_Info *info;
01305 try
01306 {
01307 for (this->peer_list_iterator_.first ();
01308 (info = this->peer_list_iterator_.next ()) != 0;
01309 this->peer_list_iterator_.advance ())
01310 {
01311 info->peer_->configure (a_configuration);
01312 }
01313 }
01314 catch (const CORBA::Exception& ex)
01315 {
01316 ex._tao_print_exception (
01317 "TAO_MCastConfigIf::set_configure");
01318 return;
01319 }
01320 }
01321
01322
01323 void
01324 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial)
01325 {
01326 this->initial_configuration_ = initial;
01327 }
01328
01329
01330 void
01331 TAO_MCastConfigIf::set_format (const char * flowName,
01332 const char * format_name)
01333 {
01334 Peer_Info *info;
01335 try
01336 {
01337 for (this->peer_list_iterator_.first ();
01338 (info = this->peer_list_iterator_.next ()) != 0;
01339 this->peer_list_iterator_.advance ())
01340 {
01341 if (this->in_flowSpec (info->flow_spec_, flowName))
01342 {
01343 info->peer_->set_format (flowName, format_name);
01344 }
01345 }
01346 }
01347 catch (const CORBA::Exception& ex)
01348 {
01349 ex._tao_print_exception ("TAO_MCastConfigIf::set_format");
01350 return;
01351 }
01352 }
01353
01354
01355 void
01356 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01357 const CosPropertyService::Properties & new_params)
01358 {
01359 Peer_Info *info;
01360 try
01361 {
01362
01363 for (this->peer_list_iterator_.first ();
01364 (info = this->peer_list_iterator_.next ()) != 0;
01365 this->peer_list_iterator_.advance ())
01366 {
01367 if (this->in_flowSpec (info->flow_spec_, flowName))
01368 {
01369 info->peer_->set_dev_params (flowName, new_params);
01370 }
01371 }
01372 }
01373 catch (const CORBA::Exception& ex)
01374 {
01375 ex._tao_print_exception (
01376 "TAO_MCastConfigIf::set_dev_params");
01377 return;
01378 }
01379 }
01380
01381 int
01382 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01383 {
01384 size_t len = ACE_OS::strlen (flow_name);
01385 for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01386 if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01387 {
01388 return 1;
01389 }
01390 return 0;
01391 }
01392
01393
01394
01395
01396
01397 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01398 : protocol_object_set_ (0)
01399 {
01400 }
01401
01402 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01403 {
01404 }
01405
01406 int
01407 TAO_Base_StreamEndPoint::handle_close (void)
01408 {
01409
01410
01411 return -1;
01412 }
01413
01414 int
01415 TAO_Base_StreamEndPoint::handle_open (void)
01416 {
01417 return 0;
01418 }
01419
01420 int
01421 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &)
01422 {
01423 return 0;
01424 }
01425
01426 int
01427 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &)
01428 {
01429 return 0;
01430 }
01431
01432 int
01433 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &)
01434 {
01435 return 0;
01436 }
01437
01438
01439 CORBA::Boolean
01440 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01441 {
01442 return 1;
01443 }
01444
01445
01446 CORBA::Boolean
01447 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01448 {
01449
01450 while (!this->is_protocol_object_set ())
01451 TAO_AV_CORE::instance ()->orb ()->perform_work ();
01452 return 1;
01453 }
01454
01455
01456 CORBA::Boolean
01457 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &)
01458 {
01459 return 1;
01460 }
01461
01462 int
01463 TAO_Base_StreamEndPoint::set_protocol_object (const char * ,
01464 TAO_AV_Protocol_Object * )
01465 {
01466 return -1;
01467 }
01468
01469 void
01470 TAO_Base_StreamEndPoint::protocol_object_set (void)
01471 {
01472 this->protocol_object_set_ = 1;
01473 }
01474
01475
01476 int
01477 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01478 {
01479 return this->protocol_object_set_;
01480 }
01481
01482 int
01483 TAO_Base_StreamEndPoint::get_callback (const char * ,
01484 TAO_AV_Callback *&)
01485 {
01486 return -1;
01487 }
01488
01489 int
01490 TAO_Base_StreamEndPoint::get_control_callback (const char * ,
01491 TAO_AV_Callback *&)
01492 {
01493 return -1;
01494 }
01495
01496 void
01497 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01498 TAO_AV_Flow_Handler *handler)
01499 {
01500 if(TAO_debug_level > 1)
01501 {
01502 ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01503 }
01504 ACE_CString flow_name_key (flowname);
01505 if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01506 ACE_ERROR ((LM_ERROR,
01507 "Error in storing flow handler\n"));
01508 }
01509
01510 void
01511 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01512 TAO_AV_Flow_Handler *handler)
01513 {
01514 ACE_CString flow_name_key (flowname);
01515 if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01516 ACE_ERROR ((LM_ERROR,
01517 "Error in storing control flow handler\n"));
01518 }
01519
01520
01521
01522
01523
01524
01525
01526 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01527 :flow_count_ (0),
01528 flow_num_ (0),
01529 mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01530 {
01531
01532 this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01533 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01534
01535 }
01536
01537
01538 CORBA::Boolean
01539 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01540 AVStreams::streamQoS &qos,
01541 const AVStreams::flowSpec &the_spec)
01542 {
01543 if (TAO_debug_level > 0)
01544 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01545 CORBA::Boolean retv = 0;
01546 this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01547 try
01548 {
01549 if (!CORBA::is_nil (this->negotiator_.in ()))
01550 {
01551 ACE_DEBUG ((LM_DEBUG,
01552 "NEGOTIATOR AVIALABLE\n"));
01553
01554 CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01555
01556 AVStreams::Negotiator_ptr peer_negotiator;
01557 negotiator_any.in () >>= peer_negotiator;
01558 if (!CORBA::is_nil (peer_negotiator))
01559 {
01560 CORBA::Boolean result =
01561 this->negotiator_->negotiate (peer_negotiator,
01562 qos);
01563 if (!result)
01564 if (TAO_debug_level > 0)
01565 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01566 }
01567 }
01568 }
01569 catch (const CORBA::Exception& ex)
01570 {
01571 ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01572 }
01573
01574 try
01575 {
01576 if (this->protocols_.length () > 0)
01577 {
01578
01579 CORBA::Any_var protocols_any =
01580 responder->get_property_value ("AvailableProtocols");
01581 AVStreams::protocolSpec peer_protocols;
01582 AVStreams::protocolSpec *temp_protocols;
01583 protocols_any.in () >>= temp_protocols;
01584 peer_protocols = *temp_protocols;
01585 for (u_int i=0;i<peer_protocols.length ();i++)
01586 {
01587 for (u_int j=0;j<this->protocols_.length ();j++)
01588 if (ACE_OS::strcmp (peer_protocols [i],
01589 this->protocols_[j]) == 0)
01590 {
01591
01592 this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01593 break;
01594 }
01595 }
01596 }
01597 }
01598 catch (const CORBA::Exception&)
01599 {
01600 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01601 }
01602 try
01603 {
01604 AVStreams::streamQoS network_qos;
01605 if (qos.length () > 0)
01606 {
01607 if (TAO_debug_level > 0)
01608 ACE_DEBUG ((LM_DEBUG,
01609 "QoS is Specified\n"));
01610
01611 int result = this->translate_qos (qos,
01612 network_qos);
01613 if (result != 0)
01614 if (TAO_debug_level > 0)
01615 ACE_DEBUG ((LM_DEBUG,
01616 "QoS translation failed\n"));
01617
01618 this->qos ().set (network_qos);
01619 }
01620
01621
01622 AVStreams::flowSpec flow_spec (the_spec);
01623 this->handle_preconnect (flow_spec);
01624
01625 if (TAO_debug_level > 0)
01626 ACE_DEBUG ((LM_DEBUG,
01627 "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01628 flow_spec.length ()));
01629 u_int i;
01630 for (i=0;i<flow_spec.length ();i++)
01631 {
01632 TAO_Forward_FlowSpec_Entry *entry = 0;
01633 ACE_NEW_RETURN (entry,
01634 TAO_Forward_FlowSpec_Entry,
01635 0);
01636
01637 if (entry->parse (flow_spec[i]) == -1)
01638 return 0;
01639
01640 if (TAO_debug_level > 0)
01641 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n", entry->entry_to_string ()));
01642
01643 this->forward_flow_spec_set.insert (entry);
01644 }
01645
01646 int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01647 this->forward_flow_spec_set,
01648 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01649 flow_spec);
01650
01651
01652 if (result < 0)
01653 ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01654
01655
01656 AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01657
01658 retv = responder->request_connection (streamendpoint.in (),
01659 0,
01660 network_qos,
01661 flow_spec);
01662
01663 if (TAO_debug_level > 0)
01664 ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01665
01666 if (retv == 0)
01667 return retv;
01668 for (i=0;i<flow_spec.length ();i++)
01669 {
01670 TAO_Reverse_FlowSpec_Entry *entry = 0;
01671 ACE_NEW_RETURN (entry,
01672 TAO_Reverse_FlowSpec_Entry,
01673 0);
01674 if (entry->parse (flow_spec[i]) == -1)
01675 ACE_ERROR_RETURN ((LM_ERROR,
01676 "Reverse_Flow_Spec_Set::parse failed\n"),
01677 0);
01678
01679 if (TAO_debug_level > 0)
01680 ACE_DEBUG ((LM_DEBUG,
01681 "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01682 entry->entry_to_string ()));
01683
01684 this->reverse_flow_spec_set.insert (entry);
01685 }
01686
01687 result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01688 this->forward_flow_spec_set,
01689 this->reverse_flow_spec_set,
01690 TAO_AV_Core::TAO_AV_ENDPOINT_A);
01691 if (result < 0)
01692 ACE_ERROR_RETURN ((LM_ERROR,
01693 "TAO_AV_Core::init_reverse_flows failed\n"),
01694 0);
01695
01696
01697 retv = this->handle_postconnect (flow_spec);
01698 }
01699 catch (const CORBA::Exception& ex)
01700 {
01701 ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01702 return 0;
01703 }
01704 return retv;
01705 }
01706
01707 int
01708 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01709 AVStreams::streamQoS& network_qos)
01710 {
01711 u_int len = application_qos.length ();
01712 network_qos.length (len);
01713 for (u_int i=0;i<len;i++)
01714 {
01715 network_qos [i].QoSType = application_qos [i].QoSType;
01716 network_qos [i].QoSParams = application_qos [i].QoSParams;
01717 }
01718 return 0;
01719 }
01720
01721
01722
01723
01724 void
01725 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec)
01726 {
01727
01728 this->handle_stop (flow_spec);
01729
01730 if (flow_spec.length () > 0)
01731 {
01732
01733 for (u_int i=0;i<flow_spec.length ();i++)
01734 {
01735 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01736 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01737 begin != end; ++begin)
01738 {
01739 TAO_Forward_FlowSpec_Entry entry;
01740 entry.parse (flow_spec[i]);
01741 if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01742 {
01743 TAO_FlowSpec_Entry *entry = *begin;
01744
01745 if (entry->handler() != 0)
01746 entry->handler ()->stop (entry->role ());
01747 if (entry->control_handler () != 0)
01748 entry->control_handler ()->stop (entry->role ());
01749 break;
01750 }
01751 }
01752 }
01753 }
01754 else
01755 {
01756 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01757 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01758 begin != end; ++begin)
01759 {
01760 TAO_FlowSpec_Entry *entry = *begin;
01761
01762 if (entry->handler() != 0)
01763 entry->handler ()->stop (entry->role ());
01764 if (entry->control_handler () != 0)
01765 entry->control_handler ()->stop (entry->role ());
01766 }
01767 }
01768 }
01769
01770
01771
01772 void
01773 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec)
01774 {
01775 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01776
01777 this->handle_start (flow_spec);
01778
01779 if (flow_spec.length () > 0)
01780 {
01781
01782 for (u_int i=0;i<flow_spec.length ();i++)
01783 {
01784 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01785 for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01786 forward_begin != end; ++forward_begin)
01787 {
01788 TAO_FlowSpec_Entry *entry = *forward_begin;
01789 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01790 {
01791
01792 if (entry->handler () != 0)
01793 {
01794 entry->handler ()->start (entry->role ());
01795 }
01796 if (entry->control_handler () != 0)
01797 {
01798 entry->control_handler ()->start (entry->role ());
01799 }
01800 }
01801 }
01802
01803 end = this->reverse_flow_spec_set.end ();
01804 for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01805 reverse_begin != end; ++reverse_begin)
01806 {
01807 TAO_FlowSpec_Entry *entry = *reverse_begin;
01808 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01809 {
01810
01811 if (entry->handler () != 0)
01812 {
01813 entry->handler ()->start (entry->role ());
01814 }
01815 if (entry->control_handler () != 0)
01816 {
01817 entry->control_handler ()->start (entry->role ());
01818 }
01819 }
01820 }
01821 }
01822 }
01823 else
01824 {
01825 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01826 for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01827 forwardbegin != end; ++forwardbegin)
01828 {
01829 TAO_FlowSpec_Entry *entry = *forwardbegin;
01830 if (entry->handler () != 0)
01831 {
01832 entry->handler ()->start (entry->role ());
01833 }
01834 if (entry->control_handler () != 0)
01835 {
01836 entry->control_handler ()->start (entry->role ());
01837 }
01838 }
01839
01840 end = this->reverse_flow_spec_set.end ();
01841 for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01842 reversebegin != end; ++reversebegin)
01843 {
01844 TAO_FlowSpec_Entry *entry = *reversebegin;
01845
01846 if (entry->handler () != 0)
01847 {
01848 entry->handler ()->start (entry->role ());
01849 }
01850 if (entry->control_handler () != 0)
01851 {
01852 entry->control_handler ()->start (entry->role ());
01853 }
01854 }
01855 }
01856 }
01857
01858
01859 void
01860 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec)
01861 {
01862 CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01863
01864 AVStreams::VDev_ptr vdev;
01865
01866 vdev_any.in() >>= vdev;
01867 CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01868
01869
01870
01871 CORBA::Object_var obj;
01872 mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01873
01874 AVStreams::MediaControl_var media_ctrl =
01875 AVStreams::MediaControl::_narrow( obj.in() );
01876
01877
01878
01879 if ( !CORBA::is_nil( vdev ) )
01880 {
01881 PortableServer::ServantBase_var vdev_servant =
01882 TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01883 TAO_AV_Core::deactivate_servant (vdev_servant.in());
01884 }
01885
01886 if ( !CORBA::is_nil ( media_ctrl.in () ) )
01887 {
01888 PortableServer::ServantBase_var mc_servant =
01889 TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01890 TAO_AV_Core::deactivate_servant (mc_servant.in());
01891 }
01892
01893 int result = TAO_AV_Core::deactivate_servant (this);
01894 if (result < 0)
01895 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01896
01897 if (flow_spec.length () > 0)
01898 {
01899 for (u_int i=0;i<flow_spec.length ();i++)
01900 {
01901 {
01902 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01903 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01904 begin != end; ++begin)
01905 {
01906 TAO_FlowSpec_Entry *entry = *begin;
01907 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01908 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01909 {
01910 if (entry->protocol_object ())
01911 {
01912 entry->protocol_object ()->destroy ();
01913 }
01914 break;
01915 }
01916 }
01917 }
01918 {
01919 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01920 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01921 begin != end; ++begin)
01922 {
01923 TAO_FlowSpec_Entry *entry = *begin;
01924 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01925 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01926 {
01927 if (entry->protocol_object ())
01928 {
01929 entry->protocol_object ()->destroy ();
01930 }
01931 break;
01932 }
01933 }
01934 }
01935 }
01936 }
01937 else
01938 {
01939 {
01940 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01941 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01942 begin != end; ++begin)
01943 {
01944 TAO_FlowSpec_Entry *entry = *begin;
01945 if (entry->protocol_object ())
01946 {
01947 entry->protocol_object ()->stop ();
01948
01949 ACE_CString control_flowname =
01950 TAO_AV_Core::get_control_flowname (entry->flowname ());
01951 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01952 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01953
01954 entry->protocol_object ()->destroy ();
01955 }
01956 }
01957 }
01958 {
01959 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01960 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01961 begin != end; ++begin)
01962 {
01963 TAO_FlowSpec_Entry *entry = *begin;
01964 if (entry->protocol_object ())
01965 {
01966 entry->protocol_object ()->stop ();
01967
01968 ACE_CString control_flowname =
01969 TAO_AV_Core::get_control_flowname (entry->flowname ());
01970 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
01971 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
01972 entry->protocol_object ()->destroy ();
01973
01974 }
01975 }
01976 }
01977 }
01978
01979
01980
01981
01982 }
01983
01984
01985
01986 CORBA::Boolean
01987 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr ,
01988 CORBA::Boolean ,
01989 AVStreams::streamQoS &qos,
01990 AVStreams::flowSpec &flow_spec)
01991
01992 {
01993 if (TAO_debug_level > 0)
01994 ACE_DEBUG ((LM_DEBUG,
01995 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
01996
01997 int result = 0;
01998 try
01999 {
02000 AVStreams::streamQoS network_qos;
02001 if (qos.length () > 0)
02002 {
02003 if (TAO_debug_level > 0)
02004 ACE_DEBUG ((LM_DEBUG,
02005 "QoS is Specified\n"));
02006
02007 int result = this->translate_qos (qos, network_qos);
02008 if (result != 0)
02009 if (TAO_debug_level > 0)
02010 ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02011
02012 this->qos ().set (network_qos);
02013 }
02014
02015 if (TAO_debug_level > 0)
02016 ACE_DEBUG ((LM_DEBUG,
02017 "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02018 "flowspec has length = %d and the strings are:\n",
02019 flow_spec.length ()));
02020 CORBA::ULong i;
02021
02022 for (i=0;i<flow_spec.length ();i++)
02023 {
02024 TAO_Forward_FlowSpec_Entry *entry = 0;
02025 ACE_NEW_RETURN (entry,
02026 TAO_Forward_FlowSpec_Entry,
02027 0);
02028
02029 CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02030
02031 if(TAO_debug_level > 0)
02032 ACE_DEBUG(( LM_DEBUG,
02033 "%N:%l Parsing flow spec: [%s]\n",
02034 string_entry.in ()));
02035
02036 if (entry->parse (string_entry.in ()) == -1)
02037 {
02038 if (TAO_debug_level > 0)
02039 ACE_DEBUG ((LM_DEBUG,
02040 "%N:%l Error parsing flow_spec: [%s]\n",
02041 string_entry.in ()));
02042 return 0;
02043 }
02044 if (TAO_debug_level > 0)
02045 ACE_DEBUG ((LM_DEBUG,
02046 "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02047 entry->entry_to_string ()));
02048
02049 this->forward_flow_spec_set.insert (entry);
02050 }
02051
02052 result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02053 this->forward_flow_spec_set,
02054 TAO_AV_Core::TAO_AV_ENDPOINT_B,
02055 flow_spec);
02056
02057 if (result < 0)
02058 return 0;
02059
02060
02061 result = this->handle_connection_requested (flow_spec);
02062 }
02063 catch (const CORBA::Exception& ex)
02064 {
02065 ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02066 return 0;
02067 }
02068 return result;
02069 }
02070
02071 int
02072 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02073 const AVStreams::flowSpec &the_flows)
02074 {
02075 if (TAO_debug_level > 0)
02076 ACE_DEBUG ((LM_DEBUG,
02077 "TAO_StreamEndPoint::change_qos\n"));
02078
02079 TAO_AV_QoS qos (new_qos);
02080 for (int i = 0; (unsigned) i < the_flows.length (); i++)
02081 {
02082 TAO_Forward_FlowSpec_Entry entry;
02083 entry.parse (the_flows [i]);
02084 ACE_CString flow_name_key (entry.flowname ());
02085 Flow_Handler_Map_Entry *handler_entry;
02086 if (this->flow_handler_map_.find (flow_name_key,
02087 handler_entry) == 0)
02088 {
02089 AVStreams::QoS flow_qos;
02090 if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02091 ACE_DEBUG ((LM_DEBUG,
02092 "New QoS for the flow %s is not specified\n",
02093 entry.flowname ()));
02094 int result;
02095 result = handler_entry->int_id_->change_qos (flow_qos);
02096 if (result != 0)
02097 ACE_ERROR_RETURN ((LM_ERROR,
02098 "Modifying QoS Failed\n"),
02099 -1);
02100
02101 }
02102 }
02103 return 0;
02104 }
02105
02106
02107 CORBA::Boolean
02108 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02109 const AVStreams::flowSpec &the_flows)
02110 {
02111 if (TAO_debug_level > 0)
02112 ACE_DEBUG ((LM_DEBUG,
02113 "TAO_StreamEndPoint::modify_QoS\n"));
02114
02115 int result = this->change_qos (new_qos, the_flows);
02116
02117 if (result != 0)
02118 return 0;
02119
02120 return 1;
02121
02122 }
02123
02124
02125
02126 CORBA::Boolean
02127 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols)
02128 {
02129 try
02130 {
02131 CORBA::Any protocol_restriction_any;
02132
02133 protocol_restriction_any <<= protocols;
02134 this->define_property ("ProtocolRestriction",
02135 protocol_restriction_any);
02136 this->protocols_ = protocols;
02137 }
02138 catch (const CORBA::Exception& ex)
02139 {
02140 ex._tao_print_exception (
02141 "TAO_StreamEndPoint::set_protocol_restriction");
02142 return 0;
02143 }
02144 return 1;
02145 }
02146
02147
02148 void
02149 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec)
02150 {
02151 ACE_UNUSED_ARG (the_spec);
02152 }
02153
02154
02155
02156 void
02157 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &,
02158 const char *fp_name,
02159 const CORBA::Any &fp_settings)
02160 {
02161 if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02162 return;
02163 fp_settings >>= this->sfp_status_;
02164
02165 }
02166
02167
02168 CORBA::Object_ptr
02169 TAO_StreamEndPoint::get_fep (const char *flow_name)
02170 {
02171 ACE_CString fep_name_key (flow_name);
02172 AVStreams::FlowEndPoint_var fep_entry;
02173 if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02174 return fep_entry._retn();
02175 return 0;
02176 }
02177
02178 char*
02179 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
02180 {
02181 ACE_CString flow_name;
02182
02183 try
02184 {
02185
02186
02187 flow_name = "flow";
02188 char tmp[255];
02189 ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02190 flow_name += tmp;
02191
02192 CORBA::Any flowname_any;
02193 flowname_any <<= flow_name.c_str ();
02194 fep->define_property ("Flow",
02195 flowname_any);
02196 }
02197 catch (const CORBA::Exception& ex)
02198 {
02199 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02200 return 0;
02201 }
02202 return ACE_OS::strdup( flow_name.c_str () );
02203 }
02204
02205 char*
02206 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep)
02207 {
02208 CORBA::String_var flow_name;
02209 try
02210 {
02211 CORBA::Any_var flow_name_any =
02212 fep->get_property_value ("FlowName");
02213
02214 const char *tmp;
02215 flow_name_any >>= tmp;
02216 flow_name = CORBA::string_dup (tmp);
02217 }
02218 catch (const CORBA::Exception&)
02219 {
02220 flow_name =
02221 this->add_fep_i_add_property (fep);
02222 }
02223 return flow_name._retn ();
02224 }
02225
02226 char *
02227 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj)
02228 {
02229 AVStreams::FlowEndPoint_var fep =
02230 AVStreams::FlowEndPoint::_narrow (fep_obj);
02231
02232 CORBA::String_var flow_name =
02233 this->add_fep_i (fep.in ());
02234
02235 try
02236 {
02237 fep->lock ();
02238
02239
02240 ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02241 if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02242 {
02243 throw AVStreams::streamOpFailed ();
02244 }
02245
02246 this->flow_count_++;
02247 this->flows_.length (this->flow_count_);
02248 this->flows_[this->flow_count_-1] = flow_name;
02249
02250 CORBA::Any flows_any;
02251 flows_any <<= this->flows_;
02252 this->define_property ("Flows",
02253 flows_any);
02254 }
02255 catch (const CORBA::Exception& ex)
02256 {
02257 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02258 return 0;
02259 }
02260 return flow_name._retn ();
02261 }
02262
02263
02264 void
02265 TAO_StreamEndPoint::remove_fep (const char *flow_name)
02266 {
02267 try
02268 {
02269 ACE_CString fep_name_key (flow_name);
02270 AVStreams::FlowEndPoint_var fep_entry;
02271
02272 if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02273 throw AVStreams::streamOpFailed ();
02274
02275 AVStreams::flowSpec new_flows (this->flows_.length ());
02276 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02277 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02278 new_flows[j++] = this->flows_[i];
02279
02280 CORBA::Any flows;
02281 flows <<= new_flows;
02282 this->flows_ = new_flows;
02283 this->define_property ("Flows",
02284 flows);
02285 }
02286 catch (const CORBA::Exception& ex)
02287 {
02288 ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02289 }
02290 }
02291
02292
02293 void
02294 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
02295 {
02296 try
02297 {
02298 CORBA::Any negotiator;
02299 negotiator <<= new_negotiator;
02300 this->define_property ("Negotiator",
02301 negotiator);
02302 this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02303 }
02304 catch (const CORBA::Exception& ex)
02305 {
02306 ex._tao_print_exception (
02307 "TAO_StreamEndPoint::set_negotiator");
02308 }
02309 }
02310
02311
02312
02313 void
02314 TAO_StreamEndPoint::set_key (const char *flow_name,
02315 const AVStreams::key & the_key)
02316 {
02317 try
02318 {
02319 this->key_ = the_key;
02320 CORBA::Any PublicKey;
02321 PublicKey <<= the_key;
02322 char PublicKey_property [BUFSIZ];
02323 ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02324 this->define_property (PublicKey_property,
02325 PublicKey);
02326 }
02327 catch (const CORBA::Exception& ex)
02328 {
02329 ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02330 }
02331 }
02332
02333
02334 void
02335 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id)
02336 {
02337 this->source_id_ = source_id;
02338 }
02339
02340 CORBA::Boolean
02341 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &,
02342 AVStreams::flowSpec &)
02343 {
02344 if (TAO_debug_level > 0)
02345 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02346 return 0;
02347 }
02348
02349 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02350 {
02351
02352 TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02353 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02354
02355 int i=0;
02356
02357
02358 for ( ; begin != end; ++begin, ++i)
02359 {
02360
02361
02362 TAO_FlowSpec_Entry *entry = *begin;
02363 delete entry;
02364
02365 }
02366 begin = this->reverse_flow_spec_set.begin ();
02367 end = this->reverse_flow_spec_set.end ();
02368 i = 0;
02369 for (; begin != end; ++begin)
02370 {
02371
02372
02373 TAO_FlowSpec_Entry *entry = *begin;
02374 delete entry;
02375
02376 }
02377 }
02378
02379
02380
02381
02382
02383
02384 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02385 {
02386 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02387 }
02388
02389
02390 CORBA::Boolean
02391 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02392 AVStreams::flowSpec &flow_spec)
02393 {
02394 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02395 try
02396 {
02397 int result = 0;
02398 TAO_AV_QoS qos (stream_qos);
02399 for (u_int i=0;i< flow_spec.length ();i++)
02400 {
02401 TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02402 ACE_NEW_RETURN (forward_entry,
02403 TAO_Forward_FlowSpec_Entry,
02404 0);
02405 forward_entry->parse (flow_spec[i]);
02406 ACE_CString mcast_key (forward_entry->flowname ());
02407 AVStreams::FlowEndPoint_var flow_endpoint;
02408
02409
02410
02411
02412
02413
02414
02415
02416
02417 if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02418 {
02419 try
02420 {
02421 AVStreams::QoS flow_qos;
02422 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02423 if (result < 0)
02424 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02425
02426 AVStreams::FlowProducer_var producer;
02427 producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in());
02428
02429
02430 if (!CORBA::is_nil (producer.in ()))
02431 {
02432 AVStreams::FlowConnection_var flow_connection;
02433 try
02434 {
02435 if (CORBA::is_nil (this->streamctrl_.in ()))
02436 {
02437 CORBA::Any_var streamctrl_any;
02438 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02439 AVStreams::StreamCtrl_ptr streamctrl;
02440 streamctrl_any.in () >>= streamctrl;
02441 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02442 }
02443
02444 CORBA::Object_var flow_connection_obj =
02445 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02446 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02447 }
02448 catch (const CORBA::Exception&)
02449 {
02450 TAO_FlowConnection *flowConnection;
02451 ACE_NEW_RETURN (flowConnection,
02452 TAO_FlowConnection,
02453 0);
02454
02455 flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02456 this->mcast_port_++;
02457 flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02458 flow_connection = flowConnection->_this ();
02459 this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02460 flow_connection.in ());
02461 }
02462 if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02463 {
02464 CORBA::Any fp_settings;
02465 flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02466 fp_settings);
02467 }
02468 result = flow_connection->add_producer (producer.in (),
02469 flow_qos);
02470 if (result == 0)
02471 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02472 }
02473 }
02474 catch (const CORBA::Exception& ex)
02475 {
02476
02477 ex._tao_print_exception (
02478 "FlowProducer::_narrow");
02479 ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02480 }
02481 }
02482 else
02483 {
02484 ACE_INET_Addr *mcast_addr;
02485 TAO_FlowSpec_Entry *entry = 0;
02486 result = this->mcast_entry_map_.find (mcast_key, entry);
02487 if (result == 0)
02488 {
02489 mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02490 char str_addr [BUFSIZ];
02491 result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02492 if (result < 0)
02493 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02494 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02495 TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02496 entry->direction_str (),
02497 entry->format (),
02498 entry->flow_protocol_str (),
02499 entry->carrier_protocol_str (),
02500 entry->address ());
02501 flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02502 }
02503 else
02504 {
02505
02506 switch (forward_entry->direction ())
02507 {
02508 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02509 {
02510 ACE_NEW_RETURN (mcast_addr,
02511 ACE_INET_Addr,
02512 0);
02513 mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02514 this->mcast_port_++;
02515 char buf[BUFSIZ];
02516 mcast_addr->addr_to_string (buf, BUFSIZ);
02517 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02518 TAO_Forward_FlowSpec_Entry *new_entry;
02519 ACE_NEW_RETURN (new_entry,
02520 TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02521 forward_entry->direction_str (),
02522 forward_entry->format (),
02523 forward_entry->flow_protocol_str (),
02524 forward_entry->carrier_protocol_str (),
02525 mcast_addr),
02526 0);
02527 flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02528
02529
02530 this->forward_flow_spec_set.insert (new_entry);
02531 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02532 result = acceptor_registry->open (this,
02533 TAO_AV_CORE::instance (),
02534 this->forward_flow_spec_set);
02535 if (result < 0)
02536 ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02537 result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02538 if (result < 0)
02539 ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02540 }
02541 break;
02542 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02543
02544 break;
02545 default:
02546 break;
02547 }
02548 }
02549 }
02550 }
02551 }
02552 catch (const CORBA::Exception& ex)
02553 {
02554 ex._tao_print_exception (
02555 "TAO_StreamEndPoint_A::multiconnect");
02556 return 0;
02557 }
02558 return 1;
02559 }
02560
02561
02562 CORBA::Boolean
02563 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02564 AVStreams::streamQoS & ,
02565 const AVStreams::flowSpec & )
02566 {
02567 throw AVStreams::notSupported ();
02568 }
02569
02570
02571 void
02572 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02573 const AVStreams::flowSpec & )
02574
02575 {
02576
02577 throw AVStreams::notSupported ();
02578
02579 }
02580
02581 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02582 {
02583 }
02584
02585
02586
02587
02588
02589 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02590 {
02591 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02592 "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02593 }
02594
02595 CORBA::Boolean
02596 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02597 AVStreams::flowSpec &flow_spec)
02598 {
02599 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
02600 try
02601 {
02602 int result = 0;
02603 TAO_AV_QoS qos (stream_qos);
02604 for (u_int i=0;i< flow_spec.length ();i++)
02605 {
02606 TAO_Forward_FlowSpec_Entry *forward_entry;
02607 ACE_NEW_RETURN (forward_entry,
02608 TAO_Forward_FlowSpec_Entry,
02609 0);
02610 forward_entry->parse (flow_spec[i]);
02611 ACE_CString mcast_key (forward_entry->flowname ());
02612 AVStreams::FlowEndPoint_var flow_endpoint;
02613 if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
02614 {
02615 AVStreams::FlowConsumer_var consumer;
02616 try
02617 {
02618 consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in ());
02619 }
02620 catch (const CORBA::Exception& ex)
02621 {
02622 ex._tao_print_exception (
02623 "FlowConsumer::_narrow");
02624 ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
02625 }
02626 AVStreams::QoS flow_qos;
02627 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02628 if (result < 0)
02629 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
02630 AVStreams::FlowConnection_var flow_connection;
02631 try
02632 {
02633 if (CORBA::is_nil (this->streamctrl_.in ()))
02634 {
02635 CORBA::Any_var streamctrl_any;
02636 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02637 AVStreams::StreamCtrl_ptr streamctrl;
02638 streamctrl_any.in () >>= streamctrl;
02639 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02640 }
02641 CORBA::Object_var flow_connection_obj =
02642 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02643 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02644 }
02645 catch (const CORBA::Exception& ex)
02646 {
02647 ex._tao_print_exception (
02648 "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
02649 return 0;
02650 }
02651 result = flow_connection->add_consumer (consumer.in (),
02652 flow_qos);
02653 if (result == 0)
02654 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
02655 }
02656 else
02657 {
02658 TAO_FlowSpec_Entry *mcast_entry = 0;
02659 ACE_INET_Addr *mcast_addr;
02660 mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
02661 if (mcast_addr == 0)
02662 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
02663 result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
02664 if (result == 0)
02665 {
02666 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
02667 }
02668 else
02669 {
02670 switch (forward_entry->direction ())
02671 {
02672 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02673 {
02674
02675
02676
02677
02678
02679
02680 this->forward_flow_spec_set.insert (forward_entry);
02681 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
02682 result = connector_registry->open (this,
02683 TAO_AV_CORE::instance (),
02684 this->forward_flow_spec_set);
02685 if (result < 0)
02686 ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
02687 result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
02688 if (result < 0)
02689 ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
02690 }
02691 break;
02692 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02693
02694 break;
02695 default:
02696 break;
02697 }
02698 }
02699 }
02700 }
02701 }
02702 catch (const CORBA::Exception& ex)
02703 {
02704 ex._tao_print_exception (
02705 "TAO_StreamEndPoint_B::multiconnect");
02706 return 0;
02707 }
02708 return 1;
02709 }
02710
02711 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
02712 {
02713 }
02714
02715
02716
02717
02718
02719 TAO_VDev::TAO_VDev (void)
02720 {
02721 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02722 "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
02723 }
02724
02725
02726
02727 CORBA::Boolean
02728 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
02729 AVStreams::VDev_ptr the_peer_dev,
02730 AVStreams::streamQoS &the_qos,
02731 const AVStreams::flowSpec &the_spec)
02732 {
02733 ACE_UNUSED_ARG (the_qos);
02734 ACE_UNUSED_ARG (the_spec);
02735
02736 CORBA::Boolean result = 0;
02737 try
02738 {
02739 if (TAO_debug_level > 0)
02740 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
02741
02742
02743 CORBA::Any anyval;
02744 anyval <<= the_peer_dev;
02745 this->define_property ("Related_VDev",
02746 anyval);
02747
02748
02749 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
02750 this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
02751
02752 CORBA::Any_var anyptr;
02753 anyptr = this->peer_->get_property_value ("Related_MediaCtrl");
02754
02755 CORBA::Object_ptr media_ctrl_obj = 0;
02756
02757 anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
02758
02759 result = this->set_media_ctrl (media_ctrl_obj);
02760 }
02761 catch (const CORBA::Exception& ex)
02762 {
02763 ex._tao_print_exception ("TAO_VDev::set_peer");
02764 return 0;
02765 }
02766 return result;
02767 }
02768
02769 CORBA::Boolean
02770 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl)
02771
02772 {
02773
02774
02775 CORBA::release( media_ctrl);
02776
02777 return 1;
02778 }
02779
02780
02781 CORBA::Boolean
02782 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr ,
02783 AVStreams::MCastConfigIf_ptr mcast_peer,
02784 AVStreams::streamQoS &,
02785 const AVStreams::flowSpec &)
02786 {
02787 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
02788 return 1;
02789 }
02790
02791
02792 void
02793 TAO_VDev::configure (const CosPropertyService::Property &)
02794 {
02795 }
02796
02797
02798 void
02799 TAO_VDev::set_format (const char *flowName,
02800 const char *format_name)
02801 {
02802 try
02803 {
02804 if (flowName == 0 || format_name == 0)
02805 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
02806 char format_property [BUFSIZ];
02807 ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
02808 CORBA::Any format;
02809 format <<= format_name;
02810 this->define_property (format_property,
02811 format);
02812 }
02813 catch (const CORBA::Exception& ex)
02814 {
02815 ex._tao_print_exception ("TAO_VDev::set_format");
02816 return;
02817 }
02818 return;
02819 }
02820
02821
02822 void
02823 TAO_VDev::set_dev_params (const char *flowName,
02824 const CosPropertyService::Properties &new_params)
02825 {
02826 try
02827 {
02828 if (flowName == 0)
02829 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
02830 char devParams_property[BUFSIZ];
02831 ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
02832 CORBA::Any devParams;
02833 devParams <<= new_params;
02834 this->define_property (devParams_property,
02835 devParams);
02836 }
02837 catch (const CORBA::Exception& ex)
02838 {
02839 ex._tao_print_exception ("TAO_VDev::set_dev_params");
02840 return;
02841 }
02842 return;
02843 }
02844
02845
02846 CORBA::Boolean
02847 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
02848 const AVStreams::flowSpec &flowspec)
02849 {
02850 if (TAO_debug_level > 0)
02851 ACE_DEBUG ((LM_DEBUG,
02852 "TAO_VDev::modify_QoS\n"));
02853
02854 if (flowspec.length () != 0)
02855 {
02856 TAO_Forward_FlowSpec_Entry entry;
02857 entry.parse (flowspec [0]);
02858 int direction = entry.direction ();
02859 if (direction == 0)
02860 {
02861 AVStreams::StreamEndPoint_A_ptr sep_a;
02862
02863 CORBA::Any_ptr streamendpoint_a_any =
02864 this->get_property_value ("Related_StreamEndpoint");
02865
02866 *streamendpoint_a_any >>= sep_a;
02867 if (sep_a != 0)
02868 {
02869 sep_a->modify_QoS (the_qos, flowspec);
02870 }
02871 else ACE_DEBUG ((LM_DEBUG,
02872 "Stream EndPoint Not Found\n"));
02873 }
02874 else
02875 {
02876 AVStreams::StreamEndPoint_B_ptr sep_b;
02877
02878 CORBA::Any_ptr streamendpoint_b_any =
02879 this->get_property_value ("Related_StreamEndpoint");
02880 *streamendpoint_b_any >>= sep_b;
02881 sep_b->modify_QoS (the_qos, flowspec);
02882 }
02883 }
02884 return 1;
02885 }
02886
02887 TAO_VDev::~TAO_VDev (void)
02888 {
02889 }
02890
02891
02892
02893
02894
02895
02896 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
02897 : endpoint_strategy_ (endpoint_strategy),
02898 flow_count_ (0),
02899 flow_num_ (0),
02900 stream_ctrl_ (0)
02901 {
02902 }
02903
02904
02905
02906 AVStreams::StreamCtrl_ptr
02907 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
02908 AVStreams::streamQoS &the_qos,
02909 CORBA::Boolean_out is_met,
02910 const AVStreams::flowSpec &the_spec)
02911 {
02912 AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
02913 try
02914 {
02915 ACE_UNUSED_ARG (is_met);
02916 ACE_NEW_RETURN (this->stream_ctrl_,
02917 TAO_StreamCtrl,
02918 0);
02919 AVStreams::MMDevice_var mmdevice = this->_this ();
02920 this->stream_ctrl_->bind_devs (peer_device,
02921 mmdevice.in (),
02922 the_qos,
02923 the_spec);
02924 streamctrl = this->stream_ctrl_->_this ();
02925 }
02926 catch (const CORBA::Exception& ex)
02927 {
02928 ex._tao_print_exception ("TAO_MMDevice::bind");
02929 return streamctrl;
02930 }
02931 return streamctrl;
02932 }
02933
02934
02935 AVStreams::StreamCtrl_ptr
02936 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
02937 AVStreams::streamQoS &the_qos,
02938 CORBA::Boolean_out is_met,
02939 const AVStreams::flowSpec &the_spec)
02940 {
02941 ACE_UNUSED_ARG (first_peer);
02942 ACE_UNUSED_ARG (the_qos);
02943 ACE_UNUSED_ARG (is_met);
02944 ACE_UNUSED_ARG (the_spec);
02945
02946 return 0;
02947 }
02948
02949 AVStreams::StreamEndPoint_ptr
02950 TAO_MMDevice::create_A_B (MMDevice_Type type,
02951 AVStreams::StreamCtrl_ptr streamctrl,
02952 AVStreams::VDev_out the_vdev,
02953 AVStreams::streamQoS &the_qos,
02954 CORBA::Boolean_out met_qos,
02955 char *&,
02956 const AVStreams::flowSpec &flow_spec)
02957 {
02958 AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
02959 AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
02960 AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
02961 try
02962 {
02963 switch (type)
02964 {
02965 case MMDEVICE_A:
02966 {
02967 if (this->endpoint_strategy_->create_A (sep_a,
02968 the_vdev) == -1)
02969 ACE_ERROR_RETURN ((LM_ERROR,
02970 "TAO_MMDevice::create_A_B (%P|%t) - "
02971 "error in create_A\n"),
02972 0);
02973 sep = sep_a;
02974 }
02975 break;
02976 case MMDEVICE_B:
02977 {
02978 if (this->endpoint_strategy_->create_B (sep_b,
02979 the_vdev) == -1)
02980 ACE_ERROR_RETURN ((LM_ERROR,
02981 "TAO_MMDevice::create_A_B (%P|%t) - "
02982 "error in create_B\n"),
02983 0);
02984 sep = sep_b;
02985 }
02986 break;
02987 default:
02988 break;
02989 }
02990 if (this->fdev_map_.current_size () > 0)
02991 {
02992 TAO_AV_QoS qos (the_qos);
02993
02994 for (u_int i=0;i<flow_spec.length ();i++)
02995 {
02996 TAO_Forward_FlowSpec_Entry forward_entry;
02997 forward_entry.parse (flow_spec[i]);
02998 ACE_CString flow_key (forward_entry.flowname ());
02999 AVStreams::FDev_var flow_dev;
03000 AVStreams::FlowConnection_var flowconnection;
03001 try
03002 {
03003
03004
03005 CORBA::Object_var flowconnection_obj =
03006 streamctrl->get_flow_connection (forward_entry.flowname ());
03007 ACE_OS::printf("successfully called get_flow_connection\n");
03008 if (!CORBA::is_nil (flowconnection_obj.in ()))
03009 {
03010 flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ());
03011 }
03012 }
03013 catch (const AVStreams::noSuchFlow&)
03014 {
03015 TAO_FlowConnection *flowConnection;
03016 ACE_NEW_RETURN (flowConnection,
03017 TAO_FlowConnection,
03018 0);
03019 flowconnection = flowConnection->_this ();
03020 streamctrl->set_flow_connection (forward_entry.flowname(),
03021 flowconnection.in ());
03022 }
03023 catch (const CORBA::Exception& ex)
03024 {
03025
03026 ex._tao_print_exception (
03027 "TAO_MMDevice::create_a::get_flow_connection");
03028 }
03029
03030 int result = this->fdev_map_.find (flow_key, flow_dev);
03031 if (result < 0)
03032 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03033
03034 CORBA::String_var named_fdev;
03035 AVStreams::FlowEndPoint_var flow_endpoint;
03036 AVStreams::QoS flow_qos;
03037 result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03038 if (result < 0)
03039 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03040 switch (forward_entry.direction ())
03041 {
03042 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03043 {
03044 switch (type)
03045 {
03046 case MMDEVICE_A:
03047 {
03048
03049
03050
03051 flow_endpoint =
03052 flow_dev->create_producer (flowconnection.in (),
03053 flow_qos,
03054 met_qos,
03055 named_fdev.inout ());
03056 }
03057 break;
03058 case MMDEVICE_B:
03059 {
03060 flow_endpoint =
03061 flow_dev->create_consumer (flowconnection.in (),
03062 flow_qos,
03063 met_qos,
03064 named_fdev.inout ());
03065 }
03066 break;
03067 }
03068 }
03069 break;
03070 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03071 {
03072 switch (type)
03073 {
03074 case MMDEVICE_A:
03075 {
03076
03077
03078
03079 flow_endpoint =
03080 flow_dev->create_consumer (flowconnection.in (),
03081 flow_qos,
03082 met_qos,
03083 named_fdev.inout ());
03084 }
03085 break;
03086 case MMDEVICE_B:
03087 {
03088
03089
03090
03091 flow_endpoint =
03092 flow_dev->create_producer (flowconnection.in (),
03093 flow_qos,
03094 met_qos,
03095 named_fdev.inout ());
03096 }
03097 break;
03098 }
03099 }
03100 break;
03101 default:
03102 break;
03103 }
03104 CORBA::Any flowname_any;
03105 flowname_any <<= forward_entry.flowname ();
03106 flow_endpoint->define_property ("FlowName", flowname_any);
03107 sep->add_fep (flow_endpoint.in ());
03108 }
03109 }
03110 }
03111 catch (const CORBA::Exception& ex)
03112 {
03113 ex._tao_print_exception ("TAO_MMDevice::create_A");
03114 return sep;
03115 }
03116 return sep;
03117 }
03118
03119 AVStreams::StreamEndPoint_A_ptr
03120 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03121 AVStreams::VDev_out the_vdev,
03122 AVStreams::streamQoS &stream_qos,
03123 CORBA::Boolean_out met_qos,
03124 char *&named_vdev,
03125 const AVStreams::flowSpec &flow_spec)
03126 {
03127 AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03128 AVStreams::StreamEndPoint_var sep;
03129 try
03130 {
03131 sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03132 sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in());
03133
03134 ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03135 }
03136 catch (const CORBA::Exception& ex)
03137 {
03138 ex._tao_print_exception ("TAO_MMDevice::create_A");
03139 return sep_a;
03140 }
03141
03142 return sep_a;
03143 }
03144
03145
03146 AVStreams::StreamEndPoint_B_ptr
03147 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03148 AVStreams::VDev_out the_vdev,
03149 AVStreams::streamQoS &stream_qos,
03150 CORBA::Boolean_out met_qos,
03151 char *&named_vdev,
03152 const AVStreams::flowSpec &flow_spec)
03153 {
03154 AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03155 AVStreams::StreamEndPoint_var sep;
03156
03157 try
03158 {
03159 sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03160 sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in());
03161
03162 ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03163 }
03164 catch (const CORBA::Exception& ex)
03165 {
03166 ex._tao_print_exception ("TAO_MMDevice::create_B");
03167 return sep_b;
03168 }
03169 return sep_b;
03170 }
03171
03172
03173
03174 void
03175 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr ,
03176 const char * )
03177 {
03178
03179
03180
03181 int result = TAO_AV_Core::deactivate_servant (this);
03182 if (result < 0)
03183 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03184 }
03185
03186 char *
03187 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev)
03188 {
03189 char* tmp;
03190 ACE_NEW_RETURN (tmp,
03191 char[64],
03192 0);
03193 CORBA::String_var flow_name = tmp;
03194
03195 try
03196 {
03197
03198
03199 ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03200 CORBA::Any flowname_any;
03201 flowname_any <<= flow_name.in ();
03202 fdev->define_property ("Flow", flowname_any);
03203 }
03204 catch (const CORBA::Exception& ex)
03205 {
03206 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03207 return 0;
03208 }
03209 return flow_name._retn ();
03210 }
03211
03212
03213 char *
03214 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj)
03215 {
03216 CORBA::String_var flow_name;
03217 AVStreams::FDev_var fdev;
03218 try
03219 {
03220 CORBA::Any_ptr flow_name_any;
03221 fdev = AVStreams::FDev::_narrow (fdev_obj);
03222
03223 if (CORBA::is_nil (fdev.in ()))
03224 return 0;
03225
03226
03227 flow_name_any = fdev->get_property_value ("Flow");
03228
03229 const char *tmp;
03230 *flow_name_any >>= tmp;
03231 flow_name = CORBA::string_dup (tmp);
03232 }
03233 catch (const CORBA::Exception&)
03234 {
03235 flow_name =
03236 this->add_fdev_i (fdev.in ());
03237 }
03238
03239
03240
03241
03242 ACE_CString fdev_name_key ( flow_name.in () );
03243
03244
03245 if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03246 throw AVStreams::streamOpFailed ();
03247
03248 this->flow_count_++;
03249 this->flows_.length (this->flow_count_);
03250 this->flows_ [this->flow_count_-1] = flow_name;
03251
03252 CORBA::Any flows_any;
03253 flows_any <<= this->flows_;
03254 try
03255 {
03256 this->define_property ("Flows",
03257 flows_any);
03258 }
03259 catch (const CORBA::Exception& ex)
03260 {
03261 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03262 return 0;
03263 }
03264 return flow_name._retn ();
03265 }
03266
03267
03268 CORBA::Object_ptr
03269 TAO_MMDevice::get_fdev (const char *flow_name)
03270 {
03271
03272 ACE_CString fdev_name_key (flow_name);
03273 AVStreams::FDev_var fdev_entry;
03274 if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03275 return fdev_entry._retn() ;
03276 return 0;
03277 }
03278
03279
03280 void
03281 TAO_MMDevice::remove_fdev (const char *flow_name)
03282 {
03283 try
03284 {
03285 ACE_CString fdev_name_key (flow_name);
03286 AVStreams::FDev_var fdev_entry;
03287
03288 if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03289 throw AVStreams::streamOpFailed ();
03290
03291 AVStreams::flowSpec new_flows (this->flows_.length ());
03292 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03293 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03294 new_flows[j++] = this->flows_[i];
03295
03296 CORBA::Any flows;
03297 flows <<= new_flows;
03298 this->flows_ = new_flows;
03299 this->define_property ("Flows",
03300 flows);
03301 }
03302 catch (const CORBA::Exception& ex)
03303 {
03304 ex._tao_print_exception ("TAO_MMDevice::remove_fdev");
03305 }
03306 }
03307
03308
03309 TAO_MMDevice::~TAO_MMDevice (void)
03310 {
03311 delete this->stream_ctrl_;
03312 }
03313
03314
03315
03316
03317
03318
03319 TAO_FlowConnection::TAO_FlowConnection (void)
03320 :fp_name_ (CORBA::string_dup ("")),
03321 ip_multicast_ (0)
03322 {
03323 }
03324
03325
03326
03327
03328
03329
03330
03331
03332
03333 int
03334 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03335 {
03336 this->mcast_addr_ = mcast_addr;
03337 this->mcast_port_ = mcast_port;
03338 return 0;
03339 }
03340
03341 void
03342 TAO_FlowConnection::set_protocol (const char *protocol)
03343 {
03344 this->protocol_ = protocol;
03345 }
03346
03347
03348 void
03349 TAO_FlowConnection::stop (void)
03350 {
03351 try
03352 {
03353 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03354 ();
03355 for (FlowProducer_SetItor producer_end =
03356 this->flow_producer_set_.end ();
03357 producer_begin != producer_end; ++producer_begin)
03358 {
03359 (*producer_begin)->stop ();
03360 }
03361 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03362 ();
03363 for (FlowConsumer_SetItor consumer_end =
03364 this->flow_consumer_set_.end ();
03365 consumer_begin != consumer_end; ++consumer_begin)
03366 {
03367 (*consumer_begin)->stop ();
03368 }
03369 }
03370 catch (const CORBA::Exception& ex)
03371 {
03372 ex._tao_print_exception ("TAO_FlowConnection::stop");
03373 return;
03374 }
03375 }
03376
03377
03378 void
03379 TAO_FlowConnection::start (void)
03380 {
03381 try
03382 {
03383 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03384 ();
03385 for (FlowConsumer_SetItor consumer_end =
03386 this->flow_consumer_set_.end ();
03387 consumer_begin != consumer_end; ++consumer_begin)
03388 {
03389 (*consumer_begin)->start ();
03390 }
03391 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03392 ();
03393 for (FlowProducer_SetItor producer_end =
03394 this->flow_producer_set_.end ();
03395 producer_begin != producer_end; ++producer_begin)
03396 {
03397 (*producer_begin)->start ();
03398 }
03399 }
03400 catch (const CORBA::Exception& ex)
03401 {
03402 ex._tao_print_exception ("TAO_FlowConnection::start");
03403 return;
03404 }
03405 }
03406
03407
03408 void
03409 TAO_FlowConnection::destroy (void)
03410 {
03411 try
03412 {
03413 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03414 ();
03415 for (FlowProducer_SetItor producer_end =
03416 this->flow_producer_set_.end ();
03417 producer_begin != producer_end; ++producer_begin)
03418 {
03419 (*producer_begin)->destroy ();
03420 }
03421 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03422 ();
03423 for (FlowConsumer_SetItor consumer_end =
03424 this->flow_consumer_set_.end ();
03425 consumer_begin != consumer_end; ++consumer_begin)
03426 {
03427 (*consumer_begin)->destroy ();
03428 }
03429 }
03430 catch (const CORBA::Exception& ex)
03431 {
03432 ex._tao_print_exception ("TAO_FlowConnection::destroy");
03433 return;
03434 }
03435 int result = TAO_AV_Core::deactivate_servant (this);
03436 if (result < 0)
03437 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03438 }
03439
03440
03441 CORBA::Boolean
03442 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos)
03443 {
03444 ACE_UNUSED_ARG (new_qos);
03445 return 0;
03446 }
03447
03448
03449 CORBA::Boolean
03450 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
03451 const CORBA::Any & fp_settings)
03452 {
03453 this->fp_name_ = fp_name;
03454 this->fp_settings_ = fp_settings;
03455 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03456 ();
03457 for (FlowProducer_SetItor producer_end =
03458 this->flow_producer_set_.end ();
03459 producer_begin != producer_end; ++producer_begin)
03460 {
03461 (*producer_begin)->use_flow_protocol
03462 (fp_name, fp_settings);
03463 }
03464 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03465 ();
03466 for (FlowConsumer_SetItor consumer_end =
03467 this->flow_consumer_set_.end ();
03468 consumer_begin != consumer_end; ++consumer_begin)
03469 {
03470 (*consumer_begin)->use_flow_protocol
03471 (fp_name, fp_settings);
03472 }
03473 return 1;
03474 }
03475
03476 void
03477 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event)
03478 {
03479 ACE_UNUSED_ARG (the_event);
03480 }
03481
03482 CORBA::Boolean
03483 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
03484 AVStreams::FDev_ptr b_party,
03485 AVStreams::QoS & flow_qos)
03486 {
03487 CORBA::Boolean result = 0;
03488 try
03489 {
03490 AVStreams::FlowConnection_var flowconnection = this->_this ();
03491 CORBA::Boolean met_qos;
03492 CORBA::String_var named_fdev ((const char *)"");
03493 AVStreams::FlowProducer_var producer =
03494 a_party->create_producer (flowconnection.in (),
03495 flow_qos,
03496 met_qos,
03497 named_fdev.inout ());
03498 AVStreams::FlowConsumer_var consumer =
03499 b_party->create_consumer (flowconnection.in (),
03500 flow_qos,
03501 met_qos,
03502 named_fdev.inout ());
03503 result = this->connect (producer.in (),
03504 consumer.in (),
03505 flow_qos);
03506 }
03507 catch (const CORBA::Exception& ex)
03508 {
03509 ex._tao_print_exception (
03510 "TAO_FlowConnection::connect_devs");
03511 return 0;
03512 }
03513 return result;
03514 }
03515
03516
03517 CORBA::Boolean
03518 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
03519 AVStreams::FlowConsumer_ptr consumer,
03520 AVStreams::QoS & the_qos)
03521 {
03522 try
03523 {
03524
03525 AVStreams::FlowProducer_ptr flow_producer =
03526 AVStreams::FlowProducer::_duplicate (producer);
03527 AVStreams::FlowConsumer_ptr flow_consumer =
03528 AVStreams::FlowConsumer::_duplicate (consumer);
03529
03530 this->flow_producer_set_.insert (flow_producer);
03531 this->flow_consumer_set_.insert (flow_consumer);
03532 AVStreams::FlowConnection_var flowconnection =
03533 this->_this ();
03534
03535 flow_producer->set_peer (flowconnection.in (),
03536 flow_consumer,
03537 the_qos);
03538
03539 flow_consumer->set_peer (flowconnection.in (),
03540 flow_producer,
03541 the_qos);
03542
03543 char *consumer_address =
03544 flow_consumer->go_to_listen (the_qos,
03545 0,
03546 flow_producer,
03547 this->fp_name_.inout ());
03548
03549 if (ACE_OS::strcmp (consumer_address, "") == 0)
03550 {
03551
03552 consumer_address = flow_producer->go_to_listen (the_qos,
03553 0,
03554 flow_consumer,
03555 this->fp_name_.inout ());
03556 flow_consumer->connect_to_peer (the_qos,
03557 consumer_address,
03558 this->fp_name_.inout ());
03559
03560
03561 }
03562 else
03563 {
03564 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03565 flow_producer->connect_to_peer (the_qos,
03566 consumer_address,
03567 this->fp_name_.inout ());
03568 }
03569 }
03570 catch (const CORBA::Exception& ex)
03571 {
03572 ex._tao_print_exception ("TAO_FlowConnection::connect");
03573 }
03574 return 1;
03575 }
03576
03577
03578 CORBA::Boolean
03579 TAO_FlowConnection::disconnect (void)
03580 {
03581 return 0;
03582 }
03583
03584 CORBA::Boolean
03585 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
03586 AVStreams::QoS & the_qos)
03587 {
03588 try
03589 {
03590 AVStreams::FlowProducer_ptr flow_producer =
03591 AVStreams::FlowProducer::_duplicate (producer);
03592
03593
03594
03595
03596
03597 FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03598 FlowProducer_SetItor end = this->flow_producer_set_.end ();
03599 for (; begin != end; ++begin)
03600 {
03601 if ((*begin)->_is_equivalent (producer))
03602
03603 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03604 }
03605
03606
03607
03608
03609 int result = this->flow_producer_set_.insert (flow_producer);
03610 if (result == 1)
03611 {
03612
03613 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03614 }
03615 CORBA::Boolean met_qos;
03616 char mcast_address[BUFSIZ];
03617 if (this->producer_address_.in () == 0)
03618 {
03619 ACE_INET_Addr mcast_addr;
03620 mcast_addr.set (this->mcast_port_,
03621 this->mcast_addr_.c_str ()
03622 );
03623
03624 char buf [BUFSIZ];
03625 mcast_addr.addr_to_string (buf, BUFSIZ);
03626 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03627 }
03628 else
03629 {
03630 ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03631 }
03632 char *address = flow_producer->connect_mcast (the_qos,
03633 met_qos,
03634 mcast_address,
03635 this->fp_name_.in ());
03636
03637 if (this->producer_address_.in () == 0)
03638 {
03639 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03640 if (entry.address () != 0)
03641 {
03642
03643 this->producer_address_ = address;
03644 }
03645 else
03646 {
03647
03648 this->ip_multicast_ = 0;
03649 }
03650 }
03651
03652 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03653 {
03654 ACE_NEW_RETURN (this->mcastconfigif_i_,
03655 TAO_MCastConfigIf,
03656 0);
03657 this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03658 }
03659 AVStreams::FlowConnection_var flowconnection = this->_this ();
03660 flow_producer->set_Mcast_peer (flowconnection.in (),
03661 this->mcastconfigif_.in (),
03662 the_qos);
03663 }
03664 catch (const CORBA::Exception& ex)
03665 {
03666 ex._tao_print_exception (
03667 "TAO_FlowConnection::add_producer");
03668 return 0;
03669 }
03670 return 1;
03671 }
03672
03673 CORBA::Boolean
03674 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
03675 AVStreams::QoS & the_qos)
03676 {
03677 try
03678 {
03679 AVStreams::FlowConsumer_ptr flow_consumer =
03680 AVStreams::FlowConsumer::_duplicate (consumer);
03681 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03682 FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03683 for (; begin != end; ++begin)
03684 {
03685 if ((*begin)->_is_equivalent (consumer))
03686
03687 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03688 }
03689 int result = this->flow_consumer_set_.insert (flow_consumer);
03690 if (result == 1)
03691 {
03692
03693 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03694 }
03695
03696 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03697
03698
03699
03700
03701 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03702
03703 AVStreams::protocolSpec protocols (1);
03704 protocols.length (1);
03705 protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03706
03707 if (!this->ip_multicast_)
03708 {
03709 flow_consumer->set_protocol_restriction (protocols);
03710 char * address =
03711 flow_consumer->go_to_listen (the_qos,
03712 1,
03713 flow_producer,
03714 this->fp_name_.inout ());
03715 CORBA::Boolean is_met;
03716 flow_producer->connect_mcast (the_qos,
03717 is_met,
03718 address,
03719 this->fp_name_.inout ());
03720 }
03721 else
03722 {
03723
03724
03725
03726
03727
03728 flow_consumer->connect_to_peer (the_qos,
03729 this->producer_address_.in (),
03730 this->fp_name_.inout ());
03731
03732
03733
03734
03735
03736
03737
03738
03739 }
03740 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03741 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03742
03743 AVStreams::flowSpec flow_spec;
03744 AVStreams::streamQoS stream_qos (1);
03745 stream_qos.length (1);
03746 stream_qos [0] = the_qos;
03747 this->mcastconfigif_->set_peer (flow_consumer,
03748 stream_qos,
03749 flow_spec);
03750 }
03751 catch (const CORBA::Exception& ex)
03752 {
03753 ex._tao_print_exception (
03754 "TAO_FlowConnection::add_consumer");
03755 return 0;
03756 }
03757 return 1;
03758 }
03759
03760 CORBA::Boolean
03761 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target)
03762 {
03763 ACE_UNUSED_ARG (target);
03764 return 0;
03765 }
03766
03767
03768
03769
03770
03771
03772 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
03773 :lock_ (0)
03774 {
03775 }
03776
03777 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
03778 AVStreams::protocolSpec &protocols,
03779 const char *format)
03780 {
03781 this->open (flowname, protocols, format);
03782 }
03783
03784 void
03785 TAO_FlowEndPoint::set_flow_handler (const char * ,
03786 TAO_AV_Flow_Handler * )
03787 {
03788 }
03789
03790 int
03791 TAO_FlowEndPoint::open (const char *flowname,
03792 AVStreams::protocolSpec &protocols,
03793 const char *format)
03794 {
03795 this->flowname_ = flowname;
03796 this->format_ = format;
03797
03798 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
03799 try
03800 {
03801 CORBA::Any flowname_any;
03802 flowname_any <<= flowname;
03803 this->define_property ("FlowName",
03804 flowname_any);
03805 this->set_format (format);
03806 this->protocol_addresses_ = protocols;
03807 AVStreams::protocolSpec protocol_spec (protocols.length ());
03808 protocol_spec.length (protocols.length ());
03809 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03810 for (u_int i=0;i<protocols.length ();i++)
03811 {
03812 CORBA::String_var address = CORBA::string_dup (protocols [i]);
03813 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
03814 protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
03815 if (TAO_debug_level > 0)
03816 ACE_DEBUG ((LM_DEBUG,
03817 "[%s]\n",
03818 static_cast<char const*>(protocol_spec[i])));
03819 }
03820 this->set_protocol_restriction (protocol_spec);
03821 }
03822 catch (const CORBA::Exception& ex)
03823 {
03824 ex._tao_print_exception ("TAO_FlowEndPoint::open");
03825 return -1;
03826 }
03827 return 0;
03828 }
03829
03830
03831 int
03832 TAO_FlowEndPoint::set_flowname (const char *flowname)
03833 {
03834 this->flowname_ = flowname;
03835 return 0;
03836 }
03837
03838
03839
03840 CORBA::Boolean
03841 TAO_FlowEndPoint::lock (void)
03842 {
03843
03844
03845 if (this->lock_)
03846 return 0;
03847 this->lock_ = 1;
03848 return 1;
03849 }
03850
03851
03852 void
03853 TAO_FlowEndPoint::unlock (void)
03854 {
03855 this->lock_ = 0;
03856 }
03857
03858
03859 void
03860 TAO_FlowEndPoint::destroy (void)
03861 {
03862 int result = TAO_AV_Core::deactivate_servant (this);
03863 if (result < 0)
03864 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
03865 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
03866 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
03867 begin != end; ++begin)
03868 (*begin)->protocol_object ()->destroy ();
03869 }
03870
03871 AVStreams::StreamEndPoint_ptr
03872 TAO_FlowEndPoint::related_sep (void)
03873 {
03874
03875 return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
03876 }
03877
03878 void
03879 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep)
03880 {
03881 this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
03882 }
03883
03884 AVStreams::FlowConnection_ptr
03885 TAO_FlowEndPoint::related_flow_connection (void)
03886 {
03887 return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
03888 }
03889
03890 void
03891 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection)
03892 {
03893 this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
03894 }
03895
03896
03897 AVStreams::FlowEndPoint_ptr
03898 TAO_FlowEndPoint::get_connected_fep (void)
03899 {
03900 return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
03901 }
03902
03903 CORBA::Boolean
03904 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
03905 const CORBA::Any &)
03906 {
03907 try
03908 {
03909
03910 CORBA::Any flowname_property;
03911 flowname_property <<= fp_name;
03912 this->define_property ("FlowProtocol",
03913 flowname_property);
03914 }
03915 catch (const CORBA::Exception& ex)
03916 {
03917 ex._tao_print_exception (
03918 "TAO_FlowEndPoint::use_flow_protocol");
03919 return 0;
03920 }
03921 return 1;
03922 }
03923
03924 void
03925 TAO_FlowEndPoint::set_format (const char * format)
03926 {
03927 this->format_ = format;
03928 try
03929 {
03930
03931
03932 CORBA::Any format_val;
03933 format_val <<= format;
03934 this->define_property ("Format",
03935 format_val);
03936 }
03937 catch (const CORBA::Exception& ex)
03938 {
03939 ex._tao_print_exception ("TAO_FlowEndpoint::set_format");
03940 }
03941 }
03942
03943 void
03944 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings)
03945 {
03946 this->dev_params_ = new_settings;
03947 try
03948 {
03949 CORBA::Any DevParams_property;
03950 DevParams_property <<= new_settings;
03951 this->define_property ("DevParams",
03952 DevParams_property);
03953 }
03954 catch (const CORBA::Exception& ex)
03955 {
03956 ex._tao_print_exception (
03957 "TAO_FlowEndPoint::set_dev_params");
03958 }
03959 }
03960
03961 void
03962 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols)
03963 {
03964 try
03965 {
03966 u_int i = 0;
03967 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03968 for (i=0;i<protocols.length ();i++)
03969 {
03970 const char *protocol = (protocols)[i];
03971 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
03972 }
03973 CORBA::Any AvailableProtocols_property;
03974 AvailableProtocols_property <<= protocols;
03975 this->define_property ("AvailableProtocols",
03976 AvailableProtocols_property);
03977 AVStreams::protocolSpec *temp_spec;
03978 CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols");
03979 temp_any.in () >>= temp_spec;
03980 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03981 for (i=0;i<temp_spec->length ();i++)
03982 {
03983 const char *protocol = (*temp_spec)[i];
03984 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
03985 }
03986 this->protocols_ = protocols;
03987 }
03988 catch (const CORBA::Exception& ex)
03989 {
03990 ex._tao_print_exception (
03991 "TAO_FlowEndpoint::set_protocol_restriction");
03992 }
03993 }
03994
03995 CORBA::Boolean
03996 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep)
03997 {
03998 const char *exception_message = "";
03999 try
04000 {
04001
04002
04003
04004 CORBA::Any_var format_ptr;
04005 CORBA::String_var my_format, peer_format;
04006
04007 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04008 format_ptr = this->get_property_value ("Format");
04009
04010 const char *temp_format;
04011 format_ptr.in () >>= temp_format;
04012 my_format = CORBA::string_dup (temp_format);
04013
04014 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04015 format_ptr = peer_fep->get_property_value ("Format");
04016 format_ptr.in () >>= temp_format;
04017 peer_format = CORBA::string_dup (temp_format);
04018 if (ACE_OS::strcmp (my_format.in (),
04019 peer_format.in ()) != 0)
04020 return 0;
04021
04022
04023 CORBA::Any_var AvailableProtocols_ptr;
04024 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04025 AVStreams::protocolSpec *temp_protocols;;
04026
04027 exception_message =
04028 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04029 AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols");
04030 AvailableProtocols_ptr.in () >>= temp_protocols;
04031 my_protocol_spec = *temp_protocols;
04032
04033 exception_message =
04034 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04035 AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols");
04036 AvailableProtocols_ptr.in () >>= temp_protocols;
04037 peer_protocol_spec = *temp_protocols;
04038
04039 int protocol_match = 0;
04040 for (u_int i=0;i<my_protocol_spec.length ();i++)
04041 {
04042 CORBA::String_var my_protocol_string;
04043 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04044 {
04045 CORBA::String_var peer_protocol_string;
04046 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04047 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04048 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04049 {
04050 protocol_match = 1;
04051 break;
04052 }
04053 }
04054 if (protocol_match)
04055 break;
04056 }
04057 if (!protocol_match)
04058 return 0;
04059 }
04060 catch (const CosPropertyService::PropertyNotFound& nf)
04061 {
04062 nf._tao_print_exception (exception_message);
04063 }
04064 catch (const CORBA::Exception& ex)
04065 {
04066 ex._tao_print_exception ("TAO_FlowEndPoint::is_fep_compatible");
04067 return 0;
04068 }
04069 return 1;
04070 }
04071
04072 CORBA::Boolean
04073 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr ,
04074 AVStreams::FlowEndPoint_ptr the_peer_fep,
04075 AVStreams::QoS & )
04076 {
04077 this->peer_fep_ =
04078 AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04079 return 1;
04080 }
04081
04082 CORBA::Boolean
04083 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr ,
04084 AVStreams::MCastConfigIf_ptr mcast_peer,
04085 AVStreams::QoS & )
04086 {
04087 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04088 return 0;
04089 }
04090
04091 char *
04092 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04093 AVStreams::QoS & ,
04094 CORBA::Boolean ,
04095 AVStreams::FlowEndPoint_ptr peer_fep,
04096 char *& flowProtocol)
04097 {
04098 char direction [BUFSIZ];
04099 switch (role)
04100 {
04101 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04102 ACE_OS::strcpy (direction, "IN");
04103 break;
04104 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04105 ACE_OS::strcpy (direction, "OUT");
04106 break;
04107 default:
04108 break;
04109 }
04110 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04111 AVStreams::protocolSpec *temp_protocols;
04112 CORBA::Any_var AvailableProtocols_ptr =
04113 peer_fep->get_property_value ("AvailableProtocols");
04114 AvailableProtocols_ptr.in () >>= temp_protocols;
04115 peer_protocol_spec = *temp_protocols;
04116 AvailableProtocols_ptr =
04117 this->get_property_value ("AvailableProtocols");
04118 AvailableProtocols_ptr.in () >>= temp_protocols;
04119 my_protocol_spec = *temp_protocols;
04120 int protocol_match = 0;
04121 CORBA::String_var listen_protocol;
04122 u_int i =0;
04123 for (i=0;i<my_protocol_spec.length ();i++)
04124 {
04125 CORBA::String_var my_protocol_string;
04126 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04127 {
04128 CORBA::String_var peer_protocol_string;
04129 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04130 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04131 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04132 {
04133 listen_protocol = my_protocol_string;
04134 protocol_match = 1;
04135 break;
04136 }
04137 }
04138 if (protocol_match)
04139 break;
04140 }
04141 if (!protocol_match)
04142 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04143
04144 for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04145 if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04146 {
04147
04148 TAO_Forward_FlowSpec_Entry *entry;
04149 ACE_NEW_RETURN (entry,
04150 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04151 direction,
04152 this->format_.in (),
04153 flowProtocol,
04154 this->protocol_addresses_ [j]),
04155 0);
04156
04157 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04158 this->flow_spec_set_.insert (entry);
04159 int result = acceptor_registry->open (this,
04160 TAO_AV_CORE::instance (),
04161 this->flow_spec_set_);
04162 if (result < 0)
04163 return 0;
04164 char *listen_address = entry->get_local_addr_str ();
04165 char *address;
04166 ACE_NEW_RETURN (address,
04167 char [BUFSIZ],
04168 0);
04169 ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04170 return address;
04171 }
04172 return 0;
04173 }
04174
04175
04176 CORBA::Boolean
04177 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04178 AVStreams::QoS & ,
04179 const char * address,
04180 const char * use_flow_protocol)
04181 {
04182 char direction [BUFSIZ];
04183 switch (role)
04184 {
04185 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04186 ACE_OS::strcpy (direction, "IN");
04187 break;
04188 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04189 ACE_OS::strcpy (direction, "OUT");
04190 break;
04191 default:
04192 break;
04193 }
04194 TAO_Forward_FlowSpec_Entry *entry;
04195 ACE_NEW_RETURN (entry,
04196 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04197 direction,
04198 this->format_.in (),
04199 use_flow_protocol,
04200 address),
04201 0);
04202 this->flow_spec_set_.insert (entry);
04203 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04204 int result = connector_registry->open (this,
04205 TAO_AV_CORE::instance (),
04206 this->flow_spec_set_);
04207 if (result < 0)
04208 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04209 this->reverse_channel_ = entry->get_local_addr_str ();
04210 return 1;
04211 }
04212
04213 int
04214 TAO_FlowEndPoint::set_protocol_object (const char * ,
04215 TAO_AV_Protocol_Object * )
04216 {
04217 return 0;
04218 }
04219
04220
04221
04222
04223
04224
04225
04226 TAO_FlowProducer::TAO_FlowProducer (void)
04227 {
04228 }
04229
04230 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04231 AVStreams::protocolSpec protocols,
04232 const char *format)
04233 {
04234 this->open (flowname, protocols, format);
04235 }
04236
04237
04238 char *
04239 TAO_FlowProducer::get_rev_channel (const char * )
04240 {
04241 return 0;
04242 }
04243
04244
04245 void
04246 TAO_FlowProducer::stop (void)
04247 {
04248 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04249 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04250 begin != end; ++begin)
04251 {
04252 TAO_FlowSpec_Entry *entry = (*begin);
04253 entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04254 }
04255 }
04256
04257 void
04258 TAO_FlowProducer::start (void)
04259 {
04260 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04261 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04262 begin != end; ++begin)
04263 {
04264 TAO_FlowSpec_Entry *entry = (*begin);
04265 if (entry->handler () != 0)
04266 {
04267 entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04268 }
04269 if (entry->control_handler () != 0)
04270 {
04271 entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04272 }
04273 }
04274 }
04275
04276 char *
04277 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
04278 CORBA::Boolean is_mcast,
04279 AVStreams::FlowEndPoint_ptr peer_fep,
04280 char *& flowProtocol)
04281 {
04282 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04283 the_qos,
04284 is_mcast,
04285 peer_fep,
04286 flowProtocol);
04287 }
04288
04289 CORBA::Boolean
04290 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
04291 const char * address,
04292 const char * use_flow_protocol)
04293 {
04294 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04295 the_qos,
04296 address,
04297 use_flow_protocol);
04298 }
04299
04300 char *
04301 TAO_FlowProducer::connect_mcast (AVStreams::QoS & ,
04302 CORBA::Boolean_out ,
04303 const char *address,
04304 const char * use_flow_protocol)
04305 {
04306
04307 for (u_int i=0;i<this->protocols_.length ();i++)
04308 {
04309
04310 }
04311
04312 if (address == 0)
04313 if (TAO_debug_level > 0)
04314 ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
04315 TAO_Forward_FlowSpec_Entry *entry;
04316 ACE_NEW_RETURN (entry,
04317 TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
04318 "IN",
04319 this->format_.in (),
04320 use_flow_protocol,
04321 address),
04322 0);
04323
04324 this->flow_spec_set_.insert (entry);
04325 TAO_AV_Acceptor_Registry *acceptor_registry =
04326 TAO_AV_CORE::instance ()->acceptor_registry ();
04327 int result = acceptor_registry->open (this,
04328 TAO_AV_CORE::instance (),
04329 this->flow_spec_set_);
04330 if (result < 0)
04331 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
04332
04333
04334 ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
04335 event_handler->reactor ()->remove_handler (event_handler,
04336 ACE_Event_Handler::READ_MASK);
04337 return CORBA::string_dup (address);
04338 }
04339
04340
04341 void
04342 TAO_FlowProducer::set_key (const AVStreams::key & the_key)
04343 {
04344 try
04345 {
04346 CORBA::Any anyval;
04347 anyval <<= the_key;
04348 this->define_property ("PublicKey",
04349 anyval);
04350 }
04351 catch (const CORBA::Exception& ex)
04352 {
04353 ex._tao_print_exception ("TAO_FlowProducer::set_key");
04354 }
04355 }
04356
04357
04358 void
04359 TAO_FlowProducer::set_source_id (CORBA::Long source_id)
04360 {
04361 this->source_id_ = source_id;
04362 }
04363
04364
04365
04366
04367
04368
04369
04370 TAO_FlowConsumer::TAO_FlowConsumer (void)
04371 {
04372 }
04373
04374 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
04375 AVStreams::protocolSpec protocols,
04376 const char *format)
04377 {
04378 this->open (flowname, protocols, format);
04379 }
04380
04381
04382 void
04383 TAO_FlowConsumer::stop (void)
04384 {
04385 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04386 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04387 begin != end; ++begin)
04388 (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04389 }
04390
04391 void
04392 TAO_FlowConsumer::start (void)
04393 {
04394 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04395 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04396 begin != end; ++begin)
04397 {
04398 (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04399 }
04400 }
04401
04402 char *
04403 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
04404 CORBA::Boolean is_mcast,
04405 AVStreams::FlowEndPoint_ptr peer_fep,
04406 char *& flowProtocol)
04407 {
04408 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04409 the_qos,
04410 is_mcast,
04411 peer_fep,
04412 flowProtocol);
04413 }
04414
04415 CORBA::Boolean
04416 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
04417 const char * address,
04418 const char * use_flow_protocol)
04419 {
04420 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04421 the_qos,
04422 address,
04423 use_flow_protocol);
04424 }
04425
04426
04427
04428
04429 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
04430 :token_array_ (10),
04431 count_ (0)
04432 {
04433 this->parse (string, delimiter);
04434 }
04435
04436 TAO_Tokenizer::~TAO_Tokenizer ()
04437 {
04438 for (unsigned int i=0; i<this->num_tokens_; i++)
04439 CORBA::string_free (this->token_array_[i]);
04440 }
04441
04442
04443 int
04444 TAO_Tokenizer::parse (const char *string, char delimiter)
04445 {
04446 ACE_CString new_string (string);
04447 u_int pos =0;
04448 ACE_CString::size_type slash_pos = 0;
04449 u_int count = 0;
04450 int result;
04451 while (pos < new_string.length ())
04452 {
04453 slash_pos = new_string.find (delimiter, pos);
04454 ACE_CString substring;
04455 if (slash_pos != new_string.npos)
04456 {
04457 substring = new_string.substring (pos,
04458 slash_pos - pos);
04459 pos = slash_pos + 1;
04460 }
04461 else
04462 {
04463 substring = new_string.substring (pos);
04464 pos = static_cast<int> (new_string.length ());
04465 }
04466 char *token = CORBA::string_dup (substring.c_str ());
04467 result = this->token_array_.set (token, count);
04468 if (result == -1)
04469 {
04470 this->token_array_.size (this->token_array_.size ()*2);
04471 result = this->token_array_.set (token, count);
04472 if (result == -1)
04473 ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04474 }
04475 count++;
04476 }
04477
04478
04479
04480
04481
04482
04483
04484
04485
04486
04487
04488
04489
04490
04491
04492
04493
04494
04495
04496
04497
04498 this->num_tokens_ = count;
04499 return 0;
04500 }
04501
04502 char*
04503 TAO_Tokenizer::token (void)
04504 {
04505 if (count_ < num_tokens_)
04506 return CORBA::string_dup (this->token_array_[this->count_++]);
04507 else
04508 return 0;
04509 }
04510
04511 int
04512 TAO_Tokenizer::num_tokens (void)
04513 {
04514 return static_cast<int> (this->num_tokens_);
04515 }
04516
04517 const char *
04518 TAO_Tokenizer::operator [] (size_t index) const
04519 {
04520 if (index >= this->num_tokens_)
04521 return 0;
04522
04523 return this->token_array_[index];
04524 }
04525
04526 TAO_END_VERSIONED_NAMESPACE_DECL