00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "orbsvcs/AV/AVStreams_i.h"
00014 #include "orbsvcs/AV/sfp.h"
00015 #include "orbsvcs/AV/MCast.h"
00016 #include "orbsvcs/AV/RTCP.h"
00017
00018 #include "tao/debug.h"
00019 #include "tao/ORB_Core.h"
00020 #include "tao/AnyTypeCode/Any.h"
00021 #include "ace/OS_NS_arpa_inet.h"
00022
00023 #if !defined (__ACE_INLINE__)
00024 #include "orbsvcs/AV/AVStreams_i.inl"
00025 #endif
00026
00027 ACE_RCSID (AV,
00028 AVStreams_i,
00029 "$Id: AVStreams_i.cpp 90345 2010-05-29 12:11:02Z johnnyw $")
00030
00031 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00032
00033
00034
00035
00036
00037 TAO_AV_QoS::TAO_AV_QoS (void)
00038 {
00039 }
00040
00041 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00042 {
00043 this->set (stream_qos);
00044 }
00045
00046 int
00047 TAO_AV_QoS::convert (AVStreams::streamQoS &)
00048 {
00049 return -1;
00050 }
00051
00052
00053
00054
00055
00056 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00057 {
00058 }
00059
00060 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00061 {
00062 }
00063
00064
00065
00066
00067
00068
00069
00070 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00071 :flow_count_ (0)
00072 {
00073 }
00074
00075
00076
00077
00078 void
00079 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00080 {
00081 try
00082 {
00083
00084
00085 if (this->flow_connection_map_.current_size () > 0)
00086 {
00087 if (flow_spec.length () > 0)
00088 for (u_int i=0;i<flow_spec.length ();i++)
00089 {
00090 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00091 ACE_CString flow_name_key (flowname);
00092 AVStreams::FlowConnection_var flow_connection_entry;
00093 if (this->flow_connection_map_.find (flow_name_key,
00094 flow_connection_entry) == 0)
00095 {
00096 flow_connection_entry->stop ();
00097 }
00098 }
00099 else
00100 {
00101
00102 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00103 FlowConnection_Map_Entry *entry;
00104 for (;iterator.next (entry) != 0;iterator.advance ())
00105 {
00106 entry->int_id_->stop ();
00107 }
00108 }
00109 }
00110 }
00111 catch (const AVStreams::noSuchFlow&)
00112 {
00113 throw;
00114 }
00115 catch (const CORBA::Exception& ex)
00116 {
00117 ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00118 throw;
00119 }
00120 catch(...)
00121 {
00122 printf ("TAO_Basic_StreamCtrl::stop - unknown exception\n");
00123 }
00124 }
00125
00126
00127
00128 void
00129 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00130 {
00131 try
00132 {
00133
00134
00135
00136 if (this->flow_connection_map_.current_size () > 0)
00137 {
00138 if (flow_spec.length () > 0)
00139 for (u_int i = 0; i < flow_spec.length (); i++)
00140 {
00141 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00142 ACE_CString flow_name_key (flowname);
00143 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00144 if (this->flow_connection_map_.find (flow_name_key,
00145 flow_connection_entry) == 0)
00146 {
00147 flow_connection_entry->int_id_->start ();
00148 }
00149 }
00150 else
00151 {
00152
00153 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00154 FlowConnection_Map_Entry *entry = 0;
00155 for (;iterator.next (entry) != 0;iterator.advance ())
00156 {
00157 entry->int_id_->start ();
00158 }
00159 }
00160 }
00161 }
00162 catch (const AVStreams::noSuchFlow&)
00163 {
00164 throw;
00165 }
00166 catch (const CORBA::Exception& ex)
00167 {
00168 ex._tao_print_exception ("TAO_Basic_StreamCtrl::start");
00169 throw;
00170 }
00171 catch(...)
00172 {
00173 printf ("TAO_Basic_StreamCtrl::start - unknown exception\n");
00174 }
00175 }
00176
00177
00178
00179
00180
00181 void
00182 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00183 {
00184 try
00185 {
00186
00187 if (this->flow_connection_map_.current_size () > 0)
00188 {
00189 if (flow_spec.length () > 0)
00190 {
00191 for (u_int i=0;i<flow_spec.length ();i++)
00192 {
00193 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00194 ACE_CString flow_name_key (flowname);
00195 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00196 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00197 {
00198 flow_connection_entry->int_id_->destroy ();
00199 }
00200 }
00201 }
00202 else
00203 {
00204
00205 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00206 FlowConnection_Map_Entry *entry = 0;
00207 for (;iterator.next (entry) != 0;iterator.advance ())
00208 {
00209 entry->int_id_->destroy ();
00210 }
00211 }
00212 }
00213 }
00214 catch (const CORBA::Exception& ex)
00215 {
00216 ex._tao_print_exception ("TAO_Basic_StreamCtrl::destroy");
00217 return;
00218 }
00219 }
00220
00221
00222
00223 CORBA::Boolean
00224
00225 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & ,
00226 const AVStreams::flowSpec &)
00227 {
00228 return 1;
00229 }
00230
00231
00232
00233 void
00234 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &)
00235 {
00236 if (TAO_debug_level > 0)
00237 ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00238 }
00239
00240
00241 void
00242 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00243 const char *fp_name,
00244 const CORBA::Any &fp_settings)
00245
00246 {
00247 if (!CORBA::is_nil (this->sep_a_.in ()))
00248 {
00249 this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings);
00250 }
00251 }
00252
00253
00254 CORBA::Object_ptr
00255 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name)
00256 {
00257 ACE_CString flow_name_key (flow_name);
00258 AVStreams::FlowConnection_var flow_connection_entry;
00259
00260 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00261 return flow_connection_entry._retn();
00262 }
00263 else{
00264 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00265 throw AVStreams::noSuchFlow ();
00266 }
00267 }
00268
00269
00270 void
00271 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00272 CORBA::Object_ptr flow_connection_obj)
00273 {
00274 AVStreams::FlowConnection_var flow_connection;
00275 try
00276 {
00277 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj);
00278 }
00279 catch (const CORBA::Exception& ex)
00280 {
00281 ex._tao_print_exception (
00282 "TAO_Basic_StreamCtrl::set_flow_connection");
00283 return;
00284 }
00285
00286 this->flows_.length (this->flow_count_ + 1);
00287 this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00288 ACE_CString flow_name_key (flow_name);
00289 if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00290 {
00291 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00292 throw AVStreams::noSuchFlow ();
00293 }
00294 }
00295
00296 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00297 {
00298 }
00299
00300
00301
00302
00303
00304 CORBA::Boolean
00305 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr ,
00306 const AVStreams::streamQoS &)
00307 {
00308 ACE_DEBUG ((LM_DEBUG,
00309 "TAO_Negotiator::negotiate\n"));
00310 return 0;
00311 }
00312
00313
00314
00315
00316
00317 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00318
00319
00320 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00321 {
00322 this->mmdevice_ = AVStreams::MMDevice::_nil ();
00323 }
00324
00325
00326 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00327 {
00328 this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00329 }
00330
00331
00332 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00333 {
00334 this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00335 }
00336
00337
00338 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00339 {
00340 CORBA::release (this->mmdevice_);
00341 }
00342
00343 bool
00344 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00345 {
00346 CORBA::Boolean result = 0;
00347 try
00348 {
00349 result =
00350 this->mmdevice_->_is_equivalent (hash_key.mmdevice_);
00351 }
00352 catch (const CORBA::Exception& ex)
00353 {
00354 ex._tao_print_exception (
00355 "MMDevice_Map_Hash_Key::operator == ");
00356 return false;
00357 }
00358
00359 return result;
00360 }
00361
00362 bool
00363 operator < (const MMDevice_Map_Hash_Key &left,
00364 const MMDevice_Map_Hash_Key &right)
00365 {
00366 bool result = false;
00367
00368 try
00369 {
00370 const CORBA::ULong left_hash =
00371 left.mmdevice_->_hash (left.hash_maximum_);
00372
00373 const CORBA::ULong right_hash =
00374 right.mmdevice_->_hash (right.hash_maximum_);
00375
00376 result = left_hash < right_hash;
00377 }
00378 catch (const CORBA::Exception& ex)
00379 {
00380 ex._tao_print_exception ("operator < for MMDevice_Map_Hash_Key");
00381 return false;
00382 }
00383
00384 return result;
00385 }
00386
00387 u_long
00388 MMDevice_Map_Hash_Key::hash (void) const
00389 {
00390 u_long result = 0;
00391 try
00392 {
00393 result = this->mmdevice_->_hash (this->hash_maximum_);
00394 }
00395 catch (const CORBA::Exception& ex)
00396 {
00397 ex._tao_print_exception ("MMDevice_Map_Hash_Key::hash");
00398 return 0;
00399 }
00400 return result;
00401 }
00402
00403
00404
00405
00406
00407 TAO_StreamCtrl::TAO_StreamCtrl (void)
00408 :mcastconfigif_ (0)
00409 {
00410 try
00411 {
00412 this->streamctrl_ = this->_this ();
00413 char buf [BUFSIZ];
00414 int result = ACE_OS::hostname (buf, BUFSIZ);
00415 unsigned long ipaddr = 0;
00416 if (result == 0)
00417 ipaddr = ACE_OS::inet_addr (buf);
00418 this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00419 }
00420 catch (const CORBA::Exception& ex)
00421 {
00422 ex._tao_print_exception ("TAO_StreamCtrl::TAO_StreamCtrl");
00423 }
00424 }
00425
00426 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00427 {
00428 delete this->mcastconfigif_;
00429 }
00430
00431
00432
00433
00434 void
00435 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00436 {
00437 try
00438 {
00439 TAO_Basic_StreamCtrl::stop (flow_spec);
00440 if (this->flow_connection_map_.current_size () > 0)
00441 return;
00442 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00443 MMDevice_Map::ENTRY *entry = 0;
00444 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00445 {
00446 entry->int_id_.sep_->stop (flow_spec);
00447 }
00448 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00449 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00450 {
00451 entry->int_id_.sep_->stop (flow_spec);
00452 }
00453 }
00454 catch (const AVStreams::noSuchFlow&)
00455 {
00456 throw;
00457 }
00458 catch (const CORBA::Exception& ex)
00459 {
00460 ex._tao_print_exception ("TAO_StreamCtrl::stop");
00461 throw;
00462 }
00463 catch(...)
00464 {
00465 printf ("TAO_StreamCtrl::stop - unknow exception\n");
00466 }
00467 }
00468
00469
00470
00471 void
00472 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00473 {
00474 try
00475 {
00476 TAO_Basic_StreamCtrl::start (flow_spec);
00477 if (this->flow_connection_map_.current_size () > 0)
00478 return;
00479
00480 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00481 MMDevice_Map::ENTRY *entry = 0;
00482 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00483 {
00484 entry->int_id_.sep_->start (flow_spec);
00485 }
00486 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00487 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00488 {
00489 entry->int_id_.sep_->start (flow_spec);
00490 }
00491 }
00492 catch (const AVStreams::noSuchFlow&)
00493 {
00494 throw;
00495 }
00496 catch (const CORBA::Exception& ex)
00497 {
00498 ex._tao_print_exception ("TAO_StreamCtrl::start");
00499 throw;
00500 }
00501 catch(...)
00502 {
00503 printf ("TAO_StreamCtrl::start - unknow exception\n");
00504 }
00505 }
00506
00507
00508
00509
00510 void
00511 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00512 {
00513 try
00514 {
00515 TAO_Basic_StreamCtrl::destroy (flow_spec);
00516 if (this->flow_connection_map_.current_size () > 0)
00517 return;
00518
00519 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00520 MMDevice_Map::ENTRY *entry = 0;
00521 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00522 {
00523 entry->int_id_.sep_->destroy (flow_spec);
00524 }
00525 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00526 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00527 {
00528 entry->int_id_.sep_->destroy (flow_spec);
00529 }
00530 }
00531 catch (const CORBA::Exception& ex)
00532 {
00533 ex._tao_print_exception ("TAO_StreamCtrl::destroy");
00534 return;
00535 }
00536
00537 int result = TAO_AV_Core::deactivate_servant (this);
00538 if (result < 0)
00539 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00540 }
00541
00542
00543
00544
00545
00546 CORBA::Boolean
00547 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00548 AVStreams::MMDevice_ptr b_party,
00549 AVStreams::streamQoS &the_qos,
00550 const AVStreams::flowSpec &the_flows)
00551 {
00552 try
00553 {
00554 if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00555 ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00556
00557 if (TAO_debug_level > 0)
00558 if (CORBA::is_nil (a_party) ||
00559 CORBA::is_nil (b_party))
00560 if (TAO_debug_level > 0)
00561 ACE_DEBUG ((LM_DEBUG,
00562 "(%P|%t) TAO_StreamCtrl::bind_devs: "
00563 "a_party or b_party is null"
00564 "Multicast mode\n"));
00565
00566
00567 CORBA::Boolean met_qos;
00568 CORBA::String_var named_vdev;
00569
00570 if (!CORBA::is_nil (a_party))
00571 {
00572 MMDevice_Map_Hash_Key find_key (a_party);
00573 MMDevice_Map_Entry find_entry;
00574 int result =
00575 this->mmdevice_a_map_.find (find_key, find_entry);
00576 if (result == 0)
00577 {
00578 if (TAO_debug_level > 0)
00579 {
00580
00581 if (TAO_debug_level > 0)
00582 ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00583 }
00584 return 1;
00585 }
00586 else
00587 {
00588 this->sep_a_ =
00589 a_party-> create_A (this->streamctrl_.in (),
00590 this->vdev_a_.out (),
00591 the_qos,
00592 met_qos,
00593 named_vdev.inout (),
00594 the_flows);
00595
00596 if (TAO_debug_level > 0)
00597 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00598
00599 CORBA::Any streamctrl_any;
00600 streamctrl_any <<= this->streamctrl_.in ();
00601 this->sep_a_->define_property ("Related_StreamCtrl",
00602 streamctrl_any);
00603
00604 CORBA::Any vdev_a_any;
00605 vdev_a_any <<= this->vdev_a_.in ();
00606 this->sep_a_->define_property ("Related_VDev",
00607 vdev_a_any);
00608
00609 CORBA::Any streamendpoint_a_any;
00610 streamendpoint_a_any <<= this->sep_a_.in ();
00611 this->vdev_a_->define_property ("Related_StreamEndpoint",
00612 streamendpoint_a_any);
00613
00614
00615 CORBA::Any mmdevice_a_any;
00616 mmdevice_a_any <<= a_party;
00617 this->vdev_a_->define_property ("Related_MMDevice",
00618 mmdevice_a_any);
00619
00620
00621 MMDevice_Map_Entry map_entry;
00622 MMDevice_Map_Hash_Key key (a_party);
00623 map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00624 map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00625 map_entry.flowspec_ = the_flows;
00626 map_entry.qos_ = the_qos;
00627 result =
00628 this->mmdevice_a_map_.bind (key, map_entry);
00629 if (result < 0)
00630 if (TAO_debug_level > 0)
00631 ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00632 }
00633 }
00634
00635 if (!CORBA::is_nil (b_party))
00636 {
00637 MMDevice_Map_Hash_Key find_key (b_party);
00638 MMDevice_Map_Entry find_entry;
00639 int result =
00640 this->mmdevice_b_map_.find (find_key, find_entry);
00641 if (result == 0)
00642 {
00643
00644 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00645 return 1;
00646 }
00647 else
00648 {
00649 this->sep_b_ =
00650 b_party-> create_B (this->streamctrl_.in (),
00651 this->vdev_b_.out (),
00652 the_qos,
00653 met_qos,
00654 named_vdev.inout (),
00655 the_flows);
00656
00657 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00658
00659 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00660 "\n(%P|%t)stream_endpoint_b_ = %s",
00661 TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ())));
00662
00663 CORBA::Any streamctrl_any;
00664 streamctrl_any <<= this->streamctrl_.in ();
00665 this->sep_b_->define_property ("Related_StreamCtrl",
00666 streamctrl_any);
00667
00668 CORBA::Any vdev_b_any;
00669 vdev_b_any <<= this->vdev_b_.in ();
00670 this->sep_b_->define_property ("Related_VDev",
00671 vdev_b_any);
00672
00673 CORBA::Any streamendpoint_b_any;
00674 streamendpoint_b_any <<= this->sep_b_.in ();
00675 this->vdev_b_->define_property ("Related_StreamEndpoint",
00676 streamendpoint_b_any);
00677
00678
00679 CORBA::Any mmdevice_b_any;
00680 mmdevice_b_any <<= b_party;
00681 this->vdev_b_->define_property ("Related_MMDevice",
00682 mmdevice_b_any);
00683
00684 MMDevice_Map_Entry map_entry;
00685 MMDevice_Map_Hash_Key key (b_party);
00686 map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00687 map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00688 map_entry.flowspec_ = the_flows;
00689 map_entry.qos_ = the_qos;
00690 int result =
00691 this->mmdevice_b_map_.bind (key, map_entry);
00692 if (result < 0)
00693 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00694 }
00695 }
00696
00697
00698 if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00699 {
00700 CORBA::Any sep_a_peer_any;
00701 CORBA::Any sep_b_peer_any;
00702
00703 sep_a_peer_any <<= this->sep_b_.in();
00704 sep_b_peer_any <<= this->sep_a_.in();
00705 this->sep_a_->define_property ("PeerAdapter",
00706 sep_a_peer_any);
00707
00708 this->sep_b_->define_property ("PeerAdapter",
00709 sep_b_peer_any);
00710 }
00711
00712
00713 if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00714 {
00715
00716
00717
00718 try
00719 {
00720 CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows");
00721 AVStreams::flowSpec_var flows;
00722 *flows_any >>= flows.out ();
00723 for (CORBA::ULong i=0; i< flows->length ();++i)
00724 {
00725 CORBA::Object_var fep_obj =
00726 this->sep_a_->get_fep (flows [i]);
00727 try
00728 {
00729 AVStreams::FlowProducer_var producer =
00730 AVStreams::FlowProducer::_narrow (fep_obj.in ());
00731 producer->set_source_id (this->source_id_++);
00732 }
00733 catch (const CORBA::Exception& ex)
00734 {
00735 if (TAO_debug_level > 0)
00736 ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00737
00738 ex._tao_print_exception (
00739 "producer_check: not a producer");
00740
00741 }
00742 }
00743 }
00744 catch (const CORBA::Exception&)
00745 {
00746
00747
00748
00749
00750
00751
00752 this->sep_a_->set_source_id (this->source_id_++);
00753 }
00754 if (!this->mcastconfigif_)
00755 {
00756 ACE_NEW_RETURN (this->mcastconfigif_,
00757 TAO_MCastConfigIf,
00758 0);
00759
00760 this->mcastconfigif_ptr_ = this->mcastconfigif_->_this ();
00761 }
00762
00763 CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00764 this->mcastconfigif_ptr_.in (),
00765 the_qos,
00766 the_flows);
00767 if (!result)
00768 ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00769 }
00770
00771 if (CORBA::is_nil (a_party))
00772 {
00773 if (!CORBA::is_nil (this->vdev_b_.in ()))
00774 {
00775
00776 if (!this->mcastconfigif_)
00777 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00778 this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00779 the_qos,
00780 the_flows);
00781 }
00782
00783 int connect_leaf_success = 0;
00784 try
00785 {
00786
00787
00788 connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00789 the_qos,
00790 the_flows);
00791 connect_leaf_success = 1;
00792 }
00793 catch (const AVStreams::notSupported&)
00794 {
00795 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00796 connect_leaf_success = 0;
00797 }
00798 catch (const CORBA::Exception& ex)
00799 {
00800 ex._tao_print_exception (
00801 "TAO_StreamCtrl::bind_devs");
00802 }
00803 if (!connect_leaf_success)
00804 {
00805 if (TAO_debug_level > 0)
00806 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00807 AVStreams::flowSpec connect_flows = the_flows;
00808 this->sep_a_->multiconnect (the_qos, connect_flows);
00809 this->sep_b_->multiconnect (the_qos, connect_flows);
00810 }
00811 }
00812
00813 if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00814 {
00815
00816
00817
00818
00819 if( a_party->is_property_defined("Flows") &&
00820 b_party->is_property_defined("Flows") )
00821 {
00822 if (TAO_debug_level > 0) {
00823
00824 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00825
00826 }
00827
00828
00829
00830
00831 this->bind (this->sep_a_.in (),
00832 this->sep_b_.in (),
00833 the_qos,
00834 the_flows);
00835
00836
00837
00838 }
00839
00840 else if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00841 {
00842 if (TAO_debug_level > 0) {
00843
00844 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00845
00846 }
00847
00848
00849 this->vdev_a_->set_peer (this->streamctrl_.in (),
00850 this->vdev_b_.in (),
00851 the_qos,
00852 the_flows);
00853
00854 this->vdev_b_->set_peer (this->streamctrl_.in (),
00855 this->vdev_a_.in (),
00856 the_qos,
00857 the_flows);
00858
00859
00860
00861
00862 CORBA::Boolean result =
00863 this->sep_a_->connect (this->sep_b_.in (),
00864 the_qos,
00865 the_flows);
00866 if (result == 0)
00867 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00868 }
00869 }
00870 }
00871 catch (const CORBA::Exception& ex)
00872 {
00873 ex._tao_print_exception ("TAO_StreamCtrl::bind_devs");
00874 return 0;
00875 }
00876 return 1;
00877 }
00878
00879
00880
00881 CORBA::Boolean
00882 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
00883 AVStreams::StreamEndPoint_B_ptr sep_b,
00884 AVStreams::streamQoS &stream_qos,
00885 const AVStreams::flowSpec &flow_spec)
00886 {
00887 this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
00888 this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
00889
00890 int result = 0;
00891 try
00892 {
00893 if (CORBA::is_nil (sep_a_.in() ) ||
00894 CORBA::is_nil (sep_b_.in() ))
00895 ACE_ERROR_RETURN ((LM_ERROR,
00896 "(%P|%t) TAO_StreamCtrl::bind:"
00897 "a_party or b_party null!"),
00898 0);
00899
00900
00901 CORBA::Any sep_any;
00902 sep_any <<= sep_b;
00903 sep_a_->define_property ("PeerAdapter",
00904 sep_any);
00905 sep_any <<= sep_a;
00906 sep_b_->define_property ("PeerAdapter",
00907 sep_any);
00908
00909
00910
00911 AVStreams::flowSpec a_flows, b_flows;
00912 CORBA::Any_var flows_any;
00913 flows_any = sep_a_->get_property_value ("Flows");
00914 AVStreams::flowSpec *temp_flows = 0;
00915 flows_any.in () >>= temp_flows;
00916 a_flows = *temp_flows;
00917 flows_any = sep_b_->get_property_value ("Flows");
00918 flows_any.in () >>= temp_flows;
00919 b_flows = *temp_flows;
00920 u_int i;
00921 FlowEndPoint_Map *a_fep_map;
00922 FlowEndPoint_Map *b_fep_map;
00923 ACE_NEW_RETURN (a_fep_map,
00924 FlowEndPoint_Map,
00925 0);
00926 ACE_NEW_RETURN (b_fep_map,
00927 FlowEndPoint_Map,
00928 0);
00929 for (i=0;i<a_flows.length ();i++)
00930 {
00931 const char *flowname = a_flows[i];
00932
00933 CORBA::Object_var fep_obj =
00934 sep_a_->get_fep (flowname);
00935 AVStreams::FlowEndPoint_var fep =
00936 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00937 ACE_CString fep_key (flowname);
00938 result = a_fep_map->bind (fep_key, fep);
00939 if (result == -1)
00940 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00941 }
00942
00943 for (i=0;i<b_flows.length ();i++)
00944 {
00945 const char *flowname = b_flows[i];
00946
00947 CORBA::Object_var fep_obj =
00948 sep_b->get_fep (flowname);
00949 AVStreams::FlowEndPoint_var fep =
00950 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00951 ACE_CString fep_key (flowname);
00952 result = b_fep_map->bind (fep_key, fep);
00953 if (result == -1)
00954 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00955 }
00956 FlowEndPoint_Map *map_a = 0, *map_b = 0;
00957 if (flow_spec.length () == 0)
00958 {
00959 map_a = a_fep_map;
00960 map_b = b_fep_map;
00961 }
00962 else
00963 {
00964 FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
00965 ACE_NEW_RETURN (spec_fep_map_a,
00966 FlowEndPoint_Map,
00967 0);
00968 ACE_NEW_RETURN (spec_fep_map_b,
00969 FlowEndPoint_Map,
00970 0);
00971 for (i=0; i< flow_spec.length ();i++)
00972 {
00973 TAO_Forward_FlowSpec_Entry *entry = 0;
00974 ACE_NEW_RETURN (entry,
00975 TAO_Forward_FlowSpec_Entry,
00976 0);
00977 entry->parse (flow_spec[i]);
00978 ACE_CString fep_key (entry->flowname ());
00979 AVStreams::FlowEndPoint_var fep;
00980 result = a_fep_map->find (fep_key, fep);
00981 if (result == -1)
00982 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i].in ()), 0);
00983
00984 result = spec_fep_map_a->bind (fep_key, fep);
00985 if (result == -1)
00986 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00987
00988 result = b_fep_map->find (fep_key, fep);
00989 if (result == -1)
00990 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i].in ()), 0);
00991
00992 result = spec_fep_map_b->bind (fep_key, fep);
00993 if (result == -1)
00994 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00995 }
00996 map_a = spec_fep_map_a;
00997 map_b = spec_fep_map_b;
00998 }
00999
01000 TAO_AV_QoS qos (stream_qos);
01001
01002
01003 FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
01004 FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
01005 try
01006 {
01007
01008 for (;a_feps_iterator.next (a_feps_entry) != 0;
01009 a_feps_iterator.advance ())
01010 {
01011 AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
01012 AVStreams::FlowEndPoint_var connected_to =
01013 fep_a->get_connected_fep ();
01014
01015 if (!CORBA::is_nil (connected_to.in ()))
01016 {
01017
01018 continue;
01019 }
01020
01021 FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
01022 for (;b_feps_iterator.next (b_feps_entry) != 0;
01023 b_feps_iterator.advance ())
01024 {
01025 AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
01026 AVStreams::FlowConnection_var flow_connection;
01027
01028 AVStreams::FlowEndPoint_var connected_to =
01029 fep_b->get_connected_fep ();
01030
01031 if (!CORBA::is_nil (connected_to.in ()))
01032 {
01033
01034 continue;
01035 }
01036
01037 if (fep_a->is_fep_compatible (fep_b.in()) == 1)
01038 {
01039
01040
01041 CORBA::Object_var flow_connection_obj;
01042 CORBA::Any_var flowname_any =
01043 fep_a->get_property_value ("FlowName");
01044 const char *flowname = 0;
01045 flowname_any.in () >>= flowname;
01046 try
01047 {
01048 flow_connection_obj =
01049 this->get_flow_connection (flowname);
01050 flow_connection =
01051 AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
01052 }
01053 catch (const CORBA::Exception&)
01054 {
01055 TAO_FlowConnection *flowConnection;
01056 ACE_NEW_RETURN (flowConnection,
01057 TAO_FlowConnection,
01058 0);
01059 flow_connection = flowConnection->_this ();
01060 this->set_flow_connection (flowname,
01061 flow_connection.in ());
01062 }
01063
01064
01065
01066
01067
01068
01069
01070 AVStreams::FlowProducer_var producer;
01071 AVStreams::FlowConsumer_var consumer;
01072
01073 try
01074 {
01075 producer =
01076 AVStreams::FlowProducer::_narrow (fep_a.in());
01077 consumer =
01078 AVStreams::FlowConsumer::_narrow (fep_b.in());
01079
01080
01081
01082 if (CORBA::is_nil (producer.in ()))
01083 {
01084 producer =
01085 AVStreams::FlowProducer::_narrow (fep_b.in());
01086 consumer =
01087 AVStreams::FlowConsumer::_narrow (fep_a.in());
01088 }
01089
01090
01091
01092
01093 ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01094 ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01095 }
01096 catch (const CORBA::Exception&)
01097 {
01098
01099 throw;
01100 }
01101 CORBA::String_var fep_a_name, fep_b_name;
01102 flowname_any = fep_a->get_property_value ("FlowName");
01103 const char *temp_name = 0;
01104 flowname_any.in () >>= temp_name;
01105 fep_a_name = CORBA::string_dup (temp_name);
01106 flowname_any = fep_b->get_property_value ("FlowName");
01107 flowname_any.in () >>= temp_name;
01108 fep_b_name = CORBA::string_dup (temp_name);
01109 AVStreams::QoS flow_qos;
01110 flow_qos.QoSType = fep_a_name;
01111 flow_qos.QoSParams.length (0);
01112 result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01113 if (result == -1)
01114 {
01115 flow_qos.QoSType = fep_b_name;
01116 result = qos.get_flow_qos (fep_b_name.in (),
01117 flow_qos);
01118 if (result == -1 && TAO_debug_level > 0)
01119 ACE_DEBUG ((LM_DEBUG,
01120 "No QoS Specified for this flow <%s>\n", flowname));
01121 }
01122 flow_connection->connect (producer.in (),
01123 consumer.in (),
01124 flow_qos);
01125 }
01126 }
01127 }
01128 }
01129 catch (const CORBA::Exception& ex)
01130 {
01131 ex._tao_print_exception ("TAO_StreamCtrl::bind:flow_connect block");
01132 return 0;
01133 }
01134 }
01135 catch (const CORBA::Exception&)
01136 {
01137
01138
01139 this->sep_a_->connect (this->sep_b_.in (),
01140 stream_qos,
01141 flow_spec);
01142 }
01143 return 1;
01144 }
01145
01146 void
01147 TAO_StreamCtrl::unbind (void)
01148 {
01149 try
01150 {
01151 if (this->flow_connection_map_.current_size () > 0)
01152 return;
01153
01154 AVStreams::flowSpec flow_spec;
01155 flow_spec.length(0);
01156
01157 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01158 MMDevice_Map::ENTRY *entry = 0;
01159 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01160 {
01161 entry->int_id_.sep_->destroy (flow_spec);
01162 }
01163 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01164 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01165 {
01166 entry->int_id_.sep_->destroy (flow_spec);
01167 }
01168 }
01169 catch (const CORBA::Exception& ex)
01170 {
01171 ex._tao_print_exception ("TAO_StreamCtrl::unbind");
01172 return;
01173 }
01174 }
01175
01176 void
01177 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr ,
01178 const AVStreams::flowSpec &)
01179 {
01180 }
01181
01182 void
01183 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr ,
01184 const AVStreams::flowSpec & )
01185 {
01186 }
01187
01188 AVStreams::VDev_ptr
01189 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01190 AVStreams::StreamEndPoint_out sep)
01191 {
01192 MMDevice_Map_Hash_Key key (adev);
01193 MMDevice_Map_Entry entry;
01194 int result = -1;
01195 result = this->mmdevice_a_map_.find (key, entry);
01196 if (result < 0)
01197 {
01198 result = this->mmdevice_a_map_.find (key, entry);
01199 if (result < 0)
01200 return AVStreams::VDev::_nil ();
01201 }
01202 sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01203 return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01204 }
01205
01206 CORBA::Boolean
01207
01208 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01209 const AVStreams::flowSpec &the_spec)
01210 {
01211 if (TAO_debug_level > 0)
01212 ACE_DEBUG ((LM_DEBUG,
01213 "TAO_StreamCtrl::modify_QoS\n"));
01214
01215
01216 if (this->mcastconfigif_ != 0)
01217 {
01218
01219 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01220 }
01221 else
01222 {
01223 try
01224 {
01225 AVStreams::flowSpec in_flowspec;
01226 AVStreams::flowSpec out_flowspec;
01227
01228 in_flowspec.length (0);
01229 out_flowspec.length (0);
01230
01231 int in_index = 0;
01232 int out_index = 0;
01233
01234 AVStreams::flowSpec flowspec;
01235 if (the_spec.length () == 0)
01236 {
01237
01238 flowspec = this->flows_;
01239 MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01240 MMDevice_Map::ENTRY *entry = 0;
01241 for (;iterator.next (entry) != 0;iterator.advance ())
01242 {
01243 flowspec = entry->int_id_.flowspec_;
01244 }
01245 }
01246 else
01247 {
01248 flowspec = the_spec;
01249 }
01250
01251 if (TAO_debug_level > 0)
01252 ACE_DEBUG ((LM_DEBUG,
01253 "TAO_StreamCtrl::modify_QoS\n"));
01254
01255
01256 for (u_int i=0;i < flowspec.length ();i++)
01257 {
01258 TAO_Forward_FlowSpec_Entry entry;
01259 entry.parse (flowspec [i]);
01260 int direction = entry.direction ();
01261 if (direction == 0)
01262 {
01263 in_flowspec.length (in_index + 1);
01264 in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01265 }
01266 else
01267 {
01268 out_flowspec.length (out_index + 1);
01269 out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01270 }
01271 }
01272
01273 if (in_flowspec.length () != 0)
01274 {
01275 this->vdev_a_->modify_QoS (new_qos, in_flowspec);
01276 }
01277
01278 if (out_flowspec.length () != 0)
01279 {
01280 this->vdev_b_->modify_QoS (new_qos, out_flowspec);
01281 }
01282 }
01283 catch (const CORBA::Exception& ex)
01284 {
01285 ex._tao_print_exception ("TAO_StreamCtrl::modify_QoS");
01286 return 0;
01287 }
01288
01289 }
01290 return 1;
01291 }
01292
01293
01294
01295
01296
01297 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01298 :peer_list_iterator_ (peer_list_)
01299 {
01300 }
01301
01302 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01303 {
01304
01305 }
01306
01307
01308 CORBA::Boolean
01309 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01310 AVStreams::streamQoS & qos,
01311 const AVStreams::flowSpec & flow_spec)
01312 {
01313 try
01314 {
01315 Peer_Info *info;
01316 ACE_NEW_RETURN (info,
01317 Peer_Info,
01318 0);
01319 info->peer_ = AVStreams::VDev::_narrow (peer);
01320 info->qos_ = qos;
01321 info->flow_spec_ = flow_spec;
01322 this->peer_list_.insert_tail (info);
01323 }
01324 catch (const CORBA::Exception& ex)
01325 {
01326 ex._tao_print_exception ("TAO_MCastConfigIf::set_peer");
01327 return 0;
01328 }
01329 return 1;
01330 }
01331
01332
01333 void
01334 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration)
01335 {
01336 Peer_Info *info;
01337 try
01338 {
01339 for (this->peer_list_iterator_.first ();
01340 (info = this->peer_list_iterator_.next ()) != 0;
01341 this->peer_list_iterator_.advance ())
01342 {
01343 info->peer_->configure (a_configuration);
01344 }
01345 }
01346 catch (const CORBA::Exception& ex)
01347 {
01348 ex._tao_print_exception (
01349 "TAO_MCastConfigIf::set_configure");
01350 return;
01351 }
01352 }
01353
01354
01355 void
01356 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial)
01357 {
01358 this->initial_configuration_ = initial;
01359 }
01360
01361
01362 void
01363 TAO_MCastConfigIf::set_format (const char * flowName,
01364 const char * format_name)
01365 {
01366 Peer_Info *info;
01367 try
01368 {
01369 for (this->peer_list_iterator_.first ();
01370 (info = this->peer_list_iterator_.next ()) != 0;
01371 this->peer_list_iterator_.advance ())
01372 {
01373 if (this->in_flowSpec (info->flow_spec_, flowName))
01374 {
01375 info->peer_->set_format (flowName, format_name);
01376 }
01377 }
01378 }
01379 catch (const CORBA::Exception& ex)
01380 {
01381 ex._tao_print_exception ("TAO_MCastConfigIf::set_format");
01382 return;
01383 }
01384 }
01385
01386
01387 void
01388 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01389 const CosPropertyService::Properties & new_params)
01390 {
01391 Peer_Info *info;
01392 try
01393 {
01394
01395 for (this->peer_list_iterator_.first ();
01396 (info = this->peer_list_iterator_.next ()) != 0;
01397 this->peer_list_iterator_.advance ())
01398 {
01399 if (this->in_flowSpec (info->flow_spec_, flowName))
01400 {
01401 info->peer_->set_dev_params (flowName, new_params);
01402 }
01403 }
01404 }
01405 catch (const CORBA::Exception& ex)
01406 {
01407 ex._tao_print_exception (
01408 "TAO_MCastConfigIf::set_dev_params");
01409 return;
01410 }
01411 }
01412
01413 int
01414 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01415 {
01416 size_t len = ACE_OS::strlen (flow_name);
01417 for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01418 if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01419 {
01420 return 1;
01421 }
01422 return 0;
01423 }
01424
01425
01426
01427
01428
01429 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01430 : protocol_object_set_ (0)
01431 {
01432 }
01433
01434 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01435 {
01436 }
01437
01438 int
01439 TAO_Base_StreamEndPoint::handle_close (void)
01440 {
01441
01442
01443 return -1;
01444 }
01445
01446 int
01447 TAO_Base_StreamEndPoint::handle_open (void)
01448 {
01449 return 0;
01450 }
01451
01452 int
01453 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &)
01454 {
01455 return 0;
01456 }
01457
01458 int
01459 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &)
01460 {
01461 return 0;
01462 }
01463
01464 int
01465 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &)
01466 {
01467 return 0;
01468 }
01469
01470
01471 CORBA::Boolean
01472 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01473 {
01474 return 1;
01475 }
01476
01477
01478 CORBA::Boolean
01479 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01480 {
01481
01482 while (!this->is_protocol_object_set ())
01483 TAO_AV_CORE::instance ()->orb ()->perform_work ();
01484 return 1;
01485 }
01486
01487
01488 CORBA::Boolean
01489 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &)
01490 {
01491 return 1;
01492 }
01493
01494 int
01495 TAO_Base_StreamEndPoint::set_protocol_object (const char * ,
01496 TAO_AV_Protocol_Object * )
01497 {
01498 return -1;
01499 }
01500
01501 void
01502 TAO_Base_StreamEndPoint::protocol_object_set (void)
01503 {
01504 this->protocol_object_set_ = 1;
01505 }
01506
01507
01508 int
01509 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01510 {
01511 return this->protocol_object_set_;
01512 }
01513
01514 int
01515 TAO_Base_StreamEndPoint::get_callback (const char * ,
01516 TAO_AV_Callback *&)
01517 {
01518 return -1;
01519 }
01520
01521 int
01522 TAO_Base_StreamEndPoint::get_control_callback (const char * ,
01523 TAO_AV_Callback *&)
01524 {
01525 return -1;
01526 }
01527
01528 void
01529 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01530 TAO_AV_Flow_Handler *handler)
01531 {
01532 if(TAO_debug_level > 1)
01533 {
01534 ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01535 }
01536 ACE_CString flow_name_key (flowname);
01537 if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01538 ACE_ERROR ((LM_ERROR,
01539 "Error in storing flow handler\n"));
01540 }
01541
01542 void
01543 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01544 TAO_AV_Flow_Handler *handler)
01545 {
01546 ACE_CString flow_name_key (flowname);
01547 if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01548 ACE_ERROR ((LM_ERROR,
01549 "Error in storing control flow handler\n"));
01550 }
01551
01552
01553
01554
01555
01556
01557
01558 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01559 :flow_count_ (0),
01560 flow_num_ (0),
01561 mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01562 {
01563
01564 this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01565 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01566
01567 }
01568
01569
01570 CORBA::Boolean
01571 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01572 AVStreams::streamQoS &qos,
01573 const AVStreams::flowSpec &the_spec)
01574 {
01575 if (TAO_debug_level > 0)
01576 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01577 CORBA::Boolean retv = 0;
01578 this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01579 try
01580 {
01581 if (!CORBA::is_nil (this->negotiator_.in ()))
01582 {
01583 ACE_DEBUG ((LM_DEBUG,
01584 "NEGOTIATOR AVIALABLE\n"));
01585
01586 CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01587
01588 AVStreams::Negotiator_ptr peer_negotiator;
01589 negotiator_any.in () >>= peer_negotiator;
01590 if (!CORBA::is_nil (peer_negotiator))
01591 {
01592 CORBA::Boolean result =
01593 this->negotiator_->negotiate (peer_negotiator,
01594 qos);
01595 if (!result)
01596 if (TAO_debug_level > 0)
01597 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01598 }
01599 }
01600 }
01601 catch (const CORBA::Exception& ex)
01602 {
01603 ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01604 }
01605
01606 try
01607 {
01608 if (this->protocols_.length () > 0)
01609 {
01610
01611 CORBA::Any_var protocols_any =
01612 responder->get_property_value ("AvailableProtocols");
01613 AVStreams::protocolSpec peer_protocols;
01614 AVStreams::protocolSpec *temp_protocols = 0;
01615 protocols_any.in () >>= temp_protocols;
01616 peer_protocols = *temp_protocols;
01617 for (u_int i=0;i<peer_protocols.length ();i++)
01618 {
01619 for (u_int j=0;j<this->protocols_.length ();j++)
01620 if (ACE_OS::strcmp (peer_protocols [i],
01621 this->protocols_[j]) == 0)
01622 {
01623
01624 this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01625 break;
01626 }
01627 }
01628 }
01629 }
01630 catch (const CORBA::Exception&)
01631 {
01632 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01633 }
01634 try
01635 {
01636 AVStreams::streamQoS network_qos;
01637 if (qos.length () > 0)
01638 {
01639 if (TAO_debug_level > 0)
01640 ACE_DEBUG ((LM_DEBUG,
01641 "QoS is Specified\n"));
01642
01643 int result = this->translate_qos (qos,
01644 network_qos);
01645 if (result != 0)
01646 if (TAO_debug_level > 0)
01647 ACE_DEBUG ((LM_DEBUG,
01648 "QoS translation failed\n"));
01649
01650 this->qos ().set (network_qos);
01651 }
01652
01653
01654 AVStreams::flowSpec flow_spec (the_spec);
01655 this->handle_preconnect (flow_spec);
01656
01657 if (TAO_debug_level > 0)
01658 ACE_DEBUG ((LM_DEBUG,
01659 "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01660 flow_spec.length ()));
01661 u_int i;
01662 for (i=0;i<flow_spec.length ();i++)
01663 {
01664 TAO_Forward_FlowSpec_Entry *entry = 0;
01665 ACE_NEW_RETURN (entry,
01666 TAO_Forward_FlowSpec_Entry,
01667 0);
01668
01669 if (entry->parse (flow_spec[i]) == -1)
01670 return 0;
01671
01672 if (TAO_debug_level > 0)
01673 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n", entry->entry_to_string ()));
01674
01675 this->forward_flow_spec_set.insert (entry);
01676 }
01677
01678 int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01679 this->forward_flow_spec_set,
01680 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01681 flow_spec);
01682
01683
01684 if (result < 0)
01685 ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01686
01687
01688 AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01689
01690 retv = responder->request_connection (streamendpoint.in (),
01691 0,
01692 network_qos,
01693 flow_spec);
01694
01695 if (TAO_debug_level > 0)
01696 ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01697
01698 if (retv == 0)
01699 return retv;
01700 for (i=0;i<flow_spec.length ();i++)
01701 {
01702 TAO_Reverse_FlowSpec_Entry *entry = 0;
01703 ACE_NEW_RETURN (entry,
01704 TAO_Reverse_FlowSpec_Entry,
01705 0);
01706 if (entry->parse (flow_spec[i]) == -1)
01707 ACE_ERROR_RETURN ((LM_ERROR,
01708 "Reverse_Flow_Spec_Set::parse failed\n"),
01709 0);
01710
01711 if (TAO_debug_level > 0)
01712 ACE_DEBUG ((LM_DEBUG,
01713 "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01714 entry->entry_to_string ()));
01715
01716 this->reverse_flow_spec_set.insert (entry);
01717 }
01718
01719 result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01720 this->forward_flow_spec_set,
01721 this->reverse_flow_spec_set,
01722 TAO_AV_Core::TAO_AV_ENDPOINT_A);
01723 if (result < 0)
01724 ACE_ERROR_RETURN ((LM_ERROR,
01725 "TAO_AV_Core::init_reverse_flows failed\n"),
01726 0);
01727
01728
01729 retv = this->handle_postconnect (flow_spec);
01730 }
01731 catch (const CORBA::Exception& ex)
01732 {
01733 ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01734 return 0;
01735 }
01736 return retv;
01737 }
01738
01739 int
01740 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01741 AVStreams::streamQoS& network_qos)
01742 {
01743 u_int len = application_qos.length ();
01744 network_qos.length (len);
01745 for (u_int i=0;i<len;i++)
01746 {
01747 network_qos [i].QoSType = application_qos [i].QoSType;
01748 network_qos [i].QoSParams = application_qos [i].QoSParams;
01749 }
01750 return 0;
01751 }
01752
01753
01754
01755
01756 void
01757 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec)
01758 {
01759
01760 this->handle_stop (flow_spec);
01761
01762 if (flow_spec.length () > 0)
01763 {
01764
01765 for (u_int i=0;i<flow_spec.length ();i++)
01766 {
01767 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01768 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01769 begin != end; ++begin)
01770 {
01771 TAO_Forward_FlowSpec_Entry entry;
01772 entry.parse (flow_spec[i]);
01773 if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01774 {
01775 TAO_FlowSpec_Entry *entry = *begin;
01776
01777 if (entry->handler() != 0)
01778 entry->handler ()->stop (entry->role ());
01779 if (entry->control_handler () != 0)
01780 entry->control_handler ()->stop (entry->role ());
01781 break;
01782 }
01783 }
01784 }
01785 }
01786 else
01787 {
01788 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01789 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01790 begin != end; ++begin)
01791 {
01792 TAO_FlowSpec_Entry *entry = *begin;
01793
01794 if (entry->handler() != 0)
01795 entry->handler ()->stop (entry->role ());
01796 if (entry->control_handler () != 0)
01797 entry->control_handler ()->stop (entry->role ());
01798 }
01799 }
01800 }
01801
01802
01803
01804 void
01805 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec)
01806 {
01807 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01808
01809 this->handle_start (flow_spec);
01810
01811 if (flow_spec.length () > 0)
01812 {
01813
01814 for (u_int i=0;i<flow_spec.length ();i++)
01815 {
01816 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01817 for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01818 forward_begin != end; ++forward_begin)
01819 {
01820 TAO_FlowSpec_Entry *entry = *forward_begin;
01821 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01822 {
01823
01824 if (entry->handler () != 0)
01825 {
01826 entry->handler ()->start (entry->role ());
01827 }
01828 if (entry->control_handler () != 0)
01829 {
01830 entry->control_handler ()->start (entry->role ());
01831 }
01832 }
01833 }
01834
01835 end = this->reverse_flow_spec_set.end ();
01836 for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01837 reverse_begin != end; ++reverse_begin)
01838 {
01839 TAO_FlowSpec_Entry *entry = *reverse_begin;
01840 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01841 {
01842
01843 if (entry->handler () != 0)
01844 {
01845 entry->handler ()->start (entry->role ());
01846 }
01847 if (entry->control_handler () != 0)
01848 {
01849 entry->control_handler ()->start (entry->role ());
01850 }
01851 }
01852 }
01853 }
01854 }
01855 else
01856 {
01857 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01858 for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01859 forwardbegin != end; ++forwardbegin)
01860 {
01861 TAO_FlowSpec_Entry *entry = *forwardbegin;
01862 if (entry->handler () != 0)
01863 {
01864 entry->handler ()->start (entry->role ());
01865 }
01866 if (entry->control_handler () != 0)
01867 {
01868 entry->control_handler ()->start (entry->role ());
01869 }
01870 }
01871
01872 end = this->reverse_flow_spec_set.end ();
01873 for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01874 reversebegin != end; ++reversebegin)
01875 {
01876 TAO_FlowSpec_Entry *entry = *reversebegin;
01877
01878 if (entry->handler () != 0)
01879 {
01880 entry->handler ()->start (entry->role ());
01881 }
01882 if (entry->control_handler () != 0)
01883 {
01884 entry->control_handler ()->start (entry->role ());
01885 }
01886 }
01887 }
01888 }
01889
01890
01891 void
01892 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec)
01893 {
01894 CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01895
01896 AVStreams::VDev_ptr vdev;
01897
01898 vdev_any.in() >>= vdev;
01899 CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01900
01901
01902
01903 CORBA::Object_var obj;
01904 mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01905
01906 AVStreams::MediaControl_var media_ctrl =
01907 AVStreams::MediaControl::_narrow( obj.in() );
01908
01909
01910
01911 if ( !CORBA::is_nil( vdev ) )
01912 {
01913 PortableServer::ServantBase_var vdev_servant =
01914 TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01915 TAO_AV_Core::deactivate_servant (vdev_servant.in());
01916 }
01917
01918 if ( !CORBA::is_nil ( media_ctrl.in () ) )
01919 {
01920 PortableServer::ServantBase_var mc_servant =
01921 TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01922 TAO_AV_Core::deactivate_servant (mc_servant.in());
01923 }
01924
01925 int result = TAO_AV_Core::deactivate_servant (this);
01926 if (result < 0)
01927 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01928
01929 if (flow_spec.length () > 0)
01930 {
01931 for (u_int i=0;i<flow_spec.length ();i++)
01932 {
01933 {
01934 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01935 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01936 begin != end; ++begin)
01937 {
01938 TAO_FlowSpec_Entry *entry = *begin;
01939 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01940 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01941 {
01942 if (entry->protocol_object ())
01943 {
01944 entry->protocol_object ()->destroy ();
01945 }
01946 break;
01947 }
01948 }
01949 }
01950 {
01951 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01952 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01953 begin != end; ++begin)
01954 {
01955 TAO_FlowSpec_Entry *entry = *begin;
01956 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01957 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01958 {
01959 if (entry->protocol_object ())
01960 {
01961 entry->protocol_object ()->destroy ();
01962 }
01963 break;
01964 }
01965 }
01966 }
01967 }
01968 }
01969 else
01970 {
01971 {
01972 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01973 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01974 begin != end; ++begin)
01975 {
01976 TAO_FlowSpec_Entry *entry = *begin;
01977 if (entry->protocol_object ())
01978 {
01979 entry->protocol_object ()->stop ();
01980
01981 ACE_CString control_flowname =
01982 TAO_AV_Core::get_control_flowname (entry->flowname ());
01983 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01984 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01985
01986 entry->protocol_object ()->destroy ();
01987 }
01988 }
01989 }
01990 {
01991 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01992 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01993 begin != end; ++begin)
01994 {
01995 TAO_FlowSpec_Entry *entry = *begin;
01996 if (entry->protocol_object ())
01997 {
01998 entry->protocol_object ()->stop ();
01999
02000 ACE_CString control_flowname =
02001 TAO_AV_Core::get_control_flowname (entry->flowname ());
02002 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
02003 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
02004 entry->protocol_object ()->destroy ();
02005
02006 }
02007 }
02008 }
02009 }
02010
02011
02012
02013
02014 }
02015
02016
02017
02018 CORBA::Boolean
02019 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr ,
02020 CORBA::Boolean ,
02021 AVStreams::streamQoS &qos,
02022 AVStreams::flowSpec &flow_spec)
02023
02024 {
02025 if (TAO_debug_level > 0)
02026 ACE_DEBUG ((LM_DEBUG,
02027 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
02028
02029 int result = 0;
02030 try
02031 {
02032 AVStreams::streamQoS network_qos;
02033 if (qos.length () > 0)
02034 {
02035 if (TAO_debug_level > 0)
02036 ACE_DEBUG ((LM_DEBUG,
02037 "QoS is Specified\n"));
02038
02039 int result = this->translate_qos (qos, network_qos);
02040 if (result != 0)
02041 if (TAO_debug_level > 0)
02042 ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02043
02044 this->qos ().set (network_qos);
02045 }
02046
02047 if (TAO_debug_level > 0)
02048 ACE_DEBUG ((LM_DEBUG,
02049 "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02050 "flowspec has length = %d and the strings are:\n",
02051 flow_spec.length ()));
02052 CORBA::ULong i;
02053
02054 for (i=0;i<flow_spec.length ();i++)
02055 {
02056 TAO_Forward_FlowSpec_Entry *entry = 0;
02057 ACE_NEW_RETURN (entry,
02058 TAO_Forward_FlowSpec_Entry,
02059 0);
02060
02061 CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02062
02063 if(TAO_debug_level > 0)
02064 ACE_DEBUG(( LM_DEBUG,
02065 "%N:%l Parsing flow spec: [%s]\n",
02066 string_entry.in ()));
02067
02068 if (entry->parse (string_entry.in ()) == -1)
02069 {
02070 if (TAO_debug_level > 0)
02071 ACE_DEBUG ((LM_DEBUG,
02072 "%N:%l Error parsing flow_spec: [%s]\n",
02073 string_entry.in ()));
02074 return 0;
02075 }
02076 if (TAO_debug_level > 0)
02077 ACE_DEBUG ((LM_DEBUG,
02078 "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02079 entry->entry_to_string ()));
02080
02081 this->forward_flow_spec_set.insert (entry);
02082 }
02083
02084 result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02085 this->forward_flow_spec_set,
02086 TAO_AV_Core::TAO_AV_ENDPOINT_B,
02087 flow_spec);
02088
02089 if (result < 0)
02090 return 0;
02091
02092
02093 result = this->handle_connection_requested (flow_spec);
02094 }
02095 catch (const CORBA::Exception& ex)
02096 {
02097 ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02098 return 0;
02099 }
02100 return result;
02101 }
02102
02103 int
02104 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02105 const AVStreams::flowSpec &the_flows)
02106 {
02107 if (TAO_debug_level > 0)
02108 ACE_DEBUG ((LM_DEBUG,
02109 "TAO_StreamEndPoint::change_qos\n"));
02110
02111 TAO_AV_QoS qos (new_qos);
02112 for (int i = 0; (unsigned) i < the_flows.length (); i++)
02113 {
02114 TAO_Forward_FlowSpec_Entry entry;
02115 entry.parse (the_flows [i]);
02116 ACE_CString flow_name_key (entry.flowname ());
02117 Flow_Handler_Map_Entry *handler_entry;
02118 if (this->flow_handler_map_.find (flow_name_key,
02119 handler_entry) == 0)
02120 {
02121 AVStreams::QoS flow_qos;
02122 if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02123 ACE_DEBUG ((LM_DEBUG,
02124 "New QoS for the flow %s is not specified\n",
02125 entry.flowname ()));
02126 int result;
02127 result = handler_entry->int_id_->change_qos (flow_qos);
02128 if (result != 0)
02129 ACE_ERROR_RETURN ((LM_ERROR,
02130 "Modifying QoS Failed\n"),
02131 -1);
02132
02133 }
02134 }
02135 return 0;
02136 }
02137
02138
02139 CORBA::Boolean
02140 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02141 const AVStreams::flowSpec &the_flows)
02142 {
02143 if (TAO_debug_level > 0)
02144 ACE_DEBUG ((LM_DEBUG,
02145 "TAO_StreamEndPoint::modify_QoS\n"));
02146
02147 int result = this->change_qos (new_qos, the_flows);
02148
02149 if (result != 0)
02150 return 0;
02151
02152 return 1;
02153
02154 }
02155
02156
02157
02158 CORBA::Boolean
02159 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols)
02160 {
02161 try
02162 {
02163 CORBA::Any protocol_restriction_any;
02164
02165 protocol_restriction_any <<= protocols;
02166 this->define_property ("ProtocolRestriction",
02167 protocol_restriction_any);
02168 this->protocols_ = protocols;
02169 }
02170 catch (const CORBA::Exception& ex)
02171 {
02172 ex._tao_print_exception (
02173 "TAO_StreamEndPoint::set_protocol_restriction");
02174 return 0;
02175 }
02176 return 1;
02177 }
02178
02179
02180 void
02181 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec)
02182 {
02183 ACE_UNUSED_ARG (the_spec);
02184 }
02185
02186
02187
02188 void
02189 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &,
02190 const char *fp_name,
02191 const CORBA::Any &fp_settings)
02192 {
02193 if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02194 return;
02195 fp_settings >>= this->sfp_status_;
02196
02197 }
02198
02199
02200 CORBA::Object_ptr
02201 TAO_StreamEndPoint::get_fep (const char *flow_name)
02202 {
02203 ACE_CString fep_name_key (flow_name);
02204 AVStreams::FlowEndPoint_var fep_entry;
02205 if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02206 return fep_entry._retn();
02207 return 0;
02208 }
02209
02210 char*
02211 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
02212 {
02213 ACE_CString flow_name;
02214
02215 try
02216 {
02217
02218
02219 flow_name = "flow";
02220 char tmp[255];
02221 ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02222 flow_name += tmp;
02223
02224 CORBA::Any flowname_any;
02225 flowname_any <<= flow_name.c_str ();
02226 fep->define_property ("Flow",
02227 flowname_any);
02228 }
02229 catch (const CORBA::Exception& ex)
02230 {
02231 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02232 return 0;
02233 }
02234 return ACE_OS::strdup( flow_name.c_str () );
02235 }
02236
02237 char*
02238 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep)
02239 {
02240 CORBA::String_var flow_name;
02241 try
02242 {
02243 CORBA::Any_var flow_name_any =
02244 fep->get_property_value ("FlowName");
02245
02246 const char *tmp = 0;
02247 flow_name_any >>= tmp;
02248 flow_name = CORBA::string_dup (tmp);
02249 }
02250 catch (const CORBA::Exception&)
02251 {
02252 flow_name =
02253 this->add_fep_i_add_property (fep);
02254 }
02255 return flow_name._retn ();
02256 }
02257
02258 char *
02259 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj)
02260 {
02261 AVStreams::FlowEndPoint_var fep =
02262 AVStreams::FlowEndPoint::_narrow (fep_obj);
02263
02264 CORBA::String_var flow_name =
02265 this->add_fep_i (fep.in ());
02266
02267 try
02268 {
02269 fep->lock ();
02270
02271
02272 ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02273 if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02274 {
02275 throw AVStreams::streamOpFailed ();
02276 }
02277
02278 this->flow_count_++;
02279 this->flows_.length (this->flow_count_);
02280 this->flows_[this->flow_count_-1] = flow_name;
02281
02282 CORBA::Any flows_any;
02283 flows_any <<= this->flows_;
02284 this->define_property ("Flows",
02285 flows_any);
02286 }
02287 catch (const CORBA::Exception& ex)
02288 {
02289 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02290 return 0;
02291 }
02292 return flow_name._retn ();
02293 }
02294
02295
02296 void
02297 TAO_StreamEndPoint::remove_fep (const char *flow_name)
02298 {
02299 try
02300 {
02301 ACE_CString fep_name_key (flow_name);
02302 AVStreams::FlowEndPoint_var fep_entry;
02303
02304 if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02305 throw AVStreams::streamOpFailed ();
02306
02307 AVStreams::flowSpec new_flows (this->flows_.length ());
02308 new_flows.length(this->flows_.length ());
02309 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02310 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02311 new_flows[j++] = this->flows_[i];
02312
02313 CORBA::Any flows;
02314 flows <<= new_flows;
02315 this->flows_ = new_flows;
02316 this->define_property ("Flows",
02317 flows);
02318 }
02319 catch (const CORBA::Exception& ex)
02320 {
02321 ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02322 }
02323 }
02324
02325
02326 void
02327 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
02328 {
02329 try
02330 {
02331 CORBA::Any negotiator;
02332 negotiator <<= new_negotiator;
02333 this->define_property ("Negotiator",
02334 negotiator);
02335 this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02336 }
02337 catch (const CORBA::Exception& ex)
02338 {
02339 ex._tao_print_exception (
02340 "TAO_StreamEndPoint::set_negotiator");
02341 }
02342 }
02343
02344
02345
02346 void
02347 TAO_StreamEndPoint::set_key (const char *flow_name,
02348 const AVStreams::key & the_key)
02349 {
02350 try
02351 {
02352 this->key_ = the_key;
02353 CORBA::Any PublicKey;
02354 PublicKey <<= the_key;
02355 char PublicKey_property [BUFSIZ];
02356 ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02357 this->define_property (PublicKey_property,
02358 PublicKey);
02359 }
02360 catch (const CORBA::Exception& ex)
02361 {
02362 ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02363 }
02364 }
02365
02366
02367 void
02368 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id)
02369 {
02370 this->source_id_ = source_id;
02371 }
02372
02373 CORBA::Boolean
02374 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &,
02375 AVStreams::flowSpec &)
02376 {
02377 if (TAO_debug_level > 0)
02378 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02379 return 0;
02380 }
02381
02382 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02383 {
02384
02385 TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02386 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02387
02388 int i=0;
02389
02390
02391 for ( ; begin != end; ++begin, ++i)
02392 {
02393
02394
02395 TAO_FlowSpec_Entry *entry = *begin;
02396 delete entry;
02397
02398 }
02399 begin = this->reverse_flow_spec_set.begin ();
02400 end = this->reverse_flow_spec_set.end ();
02401 i = 0;
02402 for (; begin != end; ++begin)
02403 {
02404
02405
02406 TAO_FlowSpec_Entry *entry = *begin;
02407 delete entry;
02408
02409 }
02410 }
02411
02412
02413
02414
02415
02416
02417 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02418 {
02419 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02420 }
02421
02422
02423 CORBA::Boolean
02424 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02425 AVStreams::flowSpec &flow_spec)
02426 {
02427 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02428 try
02429 {
02430 int result = 0;
02431 TAO_AV_QoS qos (stream_qos);
02432 for (u_int i=0;i< flow_spec.length ();i++)
02433 {
02434 TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02435 ACE_NEW_RETURN (forward_entry,
02436 TAO_Forward_FlowSpec_Entry,
02437 0);
02438 forward_entry->parse (flow_spec[i]);
02439 ACE_CString mcast_key (forward_entry->flowname ());
02440 AVStreams::FlowEndPoint_var flow_endpoint;
02441
02442
02443
02444
02445
02446
02447
02448
02449
02450 if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02451 {
02452 try
02453 {
02454 AVStreams::QoS flow_qos;
02455 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02456 if (result < 0)
02457 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02458
02459 AVStreams::FlowProducer_var producer;
02460 producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in());
02461
02462
02463 if (!CORBA::is_nil (producer.in ()))
02464 {
02465 AVStreams::FlowConnection_var flow_connection;
02466 try
02467 {
02468 if (CORBA::is_nil (this->streamctrl_.in ()))
02469 {
02470 CORBA::Any_var streamctrl_any;
02471 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02472 AVStreams::StreamCtrl_ptr streamctrl;
02473 streamctrl_any.in () >>= streamctrl;
02474 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02475 }
02476
02477 CORBA::Object_var flow_connection_obj =
02478 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02479 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02480 }
02481 catch (const CORBA::Exception&)
02482 {
02483 TAO_FlowConnection *flowConnection;
02484 ACE_NEW_RETURN (flowConnection,
02485 TAO_FlowConnection,
02486 0);
02487
02488 flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02489 this->mcast_port_++;
02490 flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02491 flow_connection = flowConnection->_this ();
02492 this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02493 flow_connection.in ());
02494 }
02495 if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02496 {
02497 CORBA::Any fp_settings;
02498 flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02499 fp_settings);
02500 }
02501 result = flow_connection->add_producer (producer.in (),
02502 flow_qos);
02503 if (result == 0)
02504 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02505 }
02506 }
02507 catch (const CORBA::Exception& ex)
02508 {
02509
02510 ex._tao_print_exception (
02511 "FlowProducer::_narrow");
02512 ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02513 }
02514 }
02515 else
02516 {
02517 ACE_INET_Addr *mcast_addr;
02518 TAO_FlowSpec_Entry *entry = 0;
02519 result = this->mcast_entry_map_.find (mcast_key, entry);
02520 if (result == 0)
02521 {
02522 mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02523 ACE_TCHAR str_addr [BUFSIZ];
02524 result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02525 if (result < 0)
02526 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02527 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02528 TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02529 entry->direction_str (),
02530 entry->format (),
02531 entry->flow_protocol_str (),
02532 entry->carrier_protocol_str (),
02533 entry->address ());
02534 flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02535 }
02536 else
02537 {
02538
02539 switch (forward_entry->direction ())
02540 {
02541 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02542 {
02543 ACE_NEW_RETURN (mcast_addr,
02544 ACE_INET_Addr,
02545 0);
02546 mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02547 this->mcast_port_++;
02548 ACE_TCHAR buf[BUFSIZ];
02549 mcast_addr->addr_to_string (buf, BUFSIZ);
02550 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02551 TAO_Forward_FlowSpec_Entry *new_entry;
02552 ACE_NEW_RETURN (new_entry,
02553 TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02554 forward_entry->direction_str (),
02555 forward_entry->format (),
02556 forward_entry->flow_protocol_str (),
02557 forward_entry->carrier_protocol_str (),
02558 mcast_addr),
02559 0);
02560 flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02561
02562
02563 this->forward_flow_spec_set.insert (new_entry);
02564 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02565 result = acceptor_registry->open (this,
02566 TAO_AV_CORE::instance (),
02567 this->forward_flow_spec_set);
02568 if (result < 0)
02569 ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02570 result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02571 if (result < 0)
02572 ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02573 }
02574 break;
02575 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02576
02577 break;
02578 default:
02579 break;
02580 }
02581 }
02582 }
02583 }
02584 }
02585 catch (const CORBA::Exception& ex)
02586 {
02587 ex._tao_print_exception (
02588 "TAO_StreamEndPoint_A::multiconnect");
02589 return 0;
02590 }
02591 return 1;
02592 }
02593
02594
02595 CORBA::Boolean
02596 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02597 AVStreams::streamQoS & ,
02598 const AVStreams::flowSpec & )
02599 {
02600 throw AVStreams::notSupported ();
02601 }
02602
02603
02604 void
02605 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02606 const AVStreams::flowSpec & )
02607
02608 {
02609
02610 throw AVStreams::notSupported ();
02611
02612 }
02613
02614 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02615 {
02616 }
02617
02618
02619
02620
02621
02622 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02623 {
02624 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02625 "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02626 }
02627
02628 CORBA::Boolean
02629 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02630 AVStreams::flowSpec &flow_spec)
02631 {
02632 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
02633 try
02634 {
02635 int result = 0;
02636 TAO_AV_QoS qos (stream_qos);
02637 for (u_int i=0;i< flow_spec.length ();i++)
02638 {
02639 TAO_Forward_FlowSpec_Entry *forward_entry;
02640 ACE_NEW_RETURN (forward_entry,
02641 TAO_Forward_FlowSpec_Entry,
02642 0);
02643 forward_entry->parse (flow_spec[i]);
02644 ACE_CString mcast_key (forward_entry->flowname ());
02645 AVStreams::FlowEndPoint_var flow_endpoint;
02646 if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
02647 {
02648 AVStreams::FlowConsumer_var consumer;
02649 try
02650 {
02651 consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in ());
02652 }
02653 catch (const CORBA::Exception& ex)
02654 {
02655 ex._tao_print_exception (
02656 "FlowConsumer::_narrow");
02657 ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
02658 }
02659 AVStreams::QoS flow_qos;
02660 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02661 if (result < 0)
02662 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
02663 AVStreams::FlowConnection_var flow_connection;
02664 try
02665 {
02666 if (CORBA::is_nil (this->streamctrl_.in ()))
02667 {
02668 CORBA::Any_var streamctrl_any;
02669 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02670 AVStreams::StreamCtrl_ptr streamctrl;
02671 streamctrl_any.in () >>= streamctrl;
02672 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02673 }
02674 CORBA::Object_var flow_connection_obj =
02675 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02676 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02677 }
02678 catch (const CORBA::Exception& ex)
02679 {
02680 ex._tao_print_exception (
02681 "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
02682 return 0;
02683 }
02684 result = flow_connection->add_consumer (consumer.in (),
02685 flow_qos);
02686 if (result == 0)
02687 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
02688 }
02689 else
02690 {
02691 TAO_FlowSpec_Entry *mcast_entry = 0;
02692 ACE_INET_Addr *mcast_addr;
02693 mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
02694 if (mcast_addr == 0)
02695 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
02696 result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
02697 if (result == 0)
02698 {
02699 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
02700 }
02701 else
02702 {
02703 switch (forward_entry->direction ())
02704 {
02705 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02706 {
02707
02708
02709
02710
02711
02712
02713 this->forward_flow_spec_set.insert (forward_entry);
02714 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
02715 result = connector_registry->open (this,
02716 TAO_AV_CORE::instance (),
02717 this->forward_flow_spec_set);
02718 if (result < 0)
02719 ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
02720 result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
02721 if (result < 0)
02722 ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
02723 }
02724 break;
02725 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02726
02727 break;
02728 default:
02729 break;
02730 }
02731 }
02732 }
02733 }
02734 }
02735 catch (const CORBA::Exception& ex)
02736 {
02737 ex._tao_print_exception (
02738 "TAO_StreamEndPoint_B::multiconnect");
02739 return 0;
02740 }
02741 return 1;
02742 }
02743
02744 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
02745 {
02746 }
02747
02748
02749
02750
02751
02752 TAO_VDev::TAO_VDev (void)
02753 {
02754 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02755 "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
02756 }
02757
02758
02759
02760 CORBA::Boolean
02761 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
02762 AVStreams::VDev_ptr the_peer_dev,
02763 AVStreams::streamQoS &the_qos,
02764 const AVStreams::flowSpec &the_spec)
02765 {
02766 ACE_UNUSED_ARG (the_qos);
02767 ACE_UNUSED_ARG (the_spec);
02768
02769 CORBA::Boolean result = 0;
02770 try
02771 {
02772 if (TAO_debug_level > 0)
02773 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
02774
02775
02776 CORBA::Any anyval;
02777 anyval <<= the_peer_dev;
02778 this->define_property ("Related_VDev",
02779 anyval);
02780
02781
02782 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
02783 this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
02784
02785 CORBA::Any_var anyptr;
02786 anyptr = this->peer_->get_property_value ("Related_MediaCtrl");
02787
02788 CORBA::Object_ptr media_ctrl_obj = 0;
02789
02790 anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
02791
02792 result = this->set_media_ctrl (media_ctrl_obj);
02793 }
02794 catch (const CORBA::Exception& ex)
02795 {
02796 ex._tao_print_exception ("TAO_VDev::set_peer");
02797 return 0;
02798 }
02799 return result;
02800 }
02801
02802 CORBA::Boolean
02803 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl)
02804
02805 {
02806
02807
02808 CORBA::release( media_ctrl);
02809
02810 return 1;
02811 }
02812
02813
02814 CORBA::Boolean
02815 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr ,
02816 AVStreams::MCastConfigIf_ptr mcast_peer,
02817 AVStreams::streamQoS &,
02818 const AVStreams::flowSpec &)
02819 {
02820 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
02821 return 1;
02822 }
02823
02824
02825 void
02826 TAO_VDev::configure (const CosPropertyService::Property &)
02827 {
02828 }
02829
02830
02831 void
02832 TAO_VDev::set_format (const char *flowName,
02833 const char *format_name)
02834 {
02835 try
02836 {
02837 if (flowName == 0 || format_name == 0)
02838 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
02839 char format_property [BUFSIZ];
02840 ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
02841 CORBA::Any format;
02842 format <<= format_name;
02843 this->define_property (format_property,
02844 format);
02845 }
02846 catch (const CORBA::Exception& ex)
02847 {
02848 ex._tao_print_exception ("TAO_VDev::set_format");
02849 return;
02850 }
02851 return;
02852 }
02853
02854
02855 void
02856 TAO_VDev::set_dev_params (const char *flowName,
02857 const CosPropertyService::Properties &new_params)
02858 {
02859 try
02860 {
02861 if (flowName == 0)
02862 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
02863 char devParams_property[BUFSIZ];
02864 ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
02865 CORBA::Any devParams;
02866 devParams <<= new_params;
02867 this->define_property (devParams_property,
02868 devParams);
02869 }
02870 catch (const CORBA::Exception& ex)
02871 {
02872 ex._tao_print_exception ("TAO_VDev::set_dev_params");
02873 return;
02874 }
02875 return;
02876 }
02877
02878
02879 CORBA::Boolean
02880 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
02881 const AVStreams::flowSpec &flowspec)
02882 {
02883 if (TAO_debug_level > 0)
02884 ACE_DEBUG ((LM_DEBUG,
02885 "TAO_VDev::modify_QoS\n"));
02886
02887 if (flowspec.length () != 0)
02888 {
02889 TAO_Forward_FlowSpec_Entry entry;
02890 entry.parse (flowspec [0]);
02891 int direction = entry.direction ();
02892 if (direction == 0)
02893 {
02894 AVStreams::StreamEndPoint_A_ptr sep_a;
02895
02896 CORBA::Any_ptr streamendpoint_a_any =
02897 this->get_property_value ("Related_StreamEndpoint");
02898
02899 *streamendpoint_a_any >>= sep_a;
02900 if (sep_a != 0)
02901 {
02902 sep_a->modify_QoS (the_qos, flowspec);
02903 }
02904 else ACE_DEBUG ((LM_DEBUG,
02905 "Stream EndPoint Not Found\n"));
02906 }
02907 else
02908 {
02909 AVStreams::StreamEndPoint_B_ptr sep_b;
02910
02911 CORBA::Any_ptr streamendpoint_b_any =
02912 this->get_property_value ("Related_StreamEndpoint");
02913 *streamendpoint_b_any >>= sep_b;
02914 sep_b->modify_QoS (the_qos, flowspec);
02915 }
02916 }
02917 return 1;
02918 }
02919
02920 TAO_VDev::~TAO_VDev (void)
02921 {
02922 }
02923
02924
02925
02926
02927
02928
02929 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
02930 : endpoint_strategy_ (endpoint_strategy),
02931 flow_count_ (0),
02932 flow_num_ (0),
02933 stream_ctrl_ (0)
02934 {
02935 }
02936
02937
02938
02939 AVStreams::StreamCtrl_ptr
02940 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
02941 AVStreams::streamQoS &the_qos,
02942 CORBA::Boolean_out is_met,
02943 const AVStreams::flowSpec &the_spec)
02944 {
02945 AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
02946 try
02947 {
02948 ACE_UNUSED_ARG (is_met);
02949 ACE_NEW_RETURN (this->stream_ctrl_,
02950 TAO_StreamCtrl,
02951 0);
02952 AVStreams::MMDevice_var mmdevice = this->_this ();
02953 this->stream_ctrl_->bind_devs (peer_device,
02954 mmdevice.in (),
02955 the_qos,
02956 the_spec);
02957 streamctrl = this->stream_ctrl_->_this ();
02958 }
02959 catch (const CORBA::Exception& ex)
02960 {
02961 ex._tao_print_exception ("TAO_MMDevice::bind");
02962 return streamctrl;
02963 }
02964 return streamctrl;
02965 }
02966
02967
02968 AVStreams::StreamCtrl_ptr
02969 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
02970 AVStreams::streamQoS &the_qos,
02971 CORBA::Boolean_out is_met,
02972 const AVStreams::flowSpec &the_spec)
02973 {
02974 ACE_UNUSED_ARG (first_peer);
02975 ACE_UNUSED_ARG (the_qos);
02976 ACE_UNUSED_ARG (is_met);
02977 ACE_UNUSED_ARG (the_spec);
02978
02979 return 0;
02980 }
02981
02982 AVStreams::StreamEndPoint_ptr
02983 TAO_MMDevice::create_A_B (MMDevice_Type type,
02984 AVStreams::StreamCtrl_ptr streamctrl,
02985 AVStreams::VDev_out the_vdev,
02986 AVStreams::streamQoS &the_qos,
02987 CORBA::Boolean_out met_qos,
02988 char *&,
02989 const AVStreams::flowSpec &flow_spec)
02990 {
02991 AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
02992 AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
02993 AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
02994 try
02995 {
02996 switch (type)
02997 {
02998 case MMDEVICE_A:
02999 {
03000 if (this->endpoint_strategy_->create_A (sep_a,
03001 the_vdev) == -1)
03002 ACE_ERROR_RETURN ((LM_ERROR,
03003 "TAO_MMDevice::create_A_B (%P|%t) - "
03004 "error in create_A\n"),
03005 0);
03006 sep = sep_a;
03007 }
03008 break;
03009 case MMDEVICE_B:
03010 {
03011 if (this->endpoint_strategy_->create_B (sep_b,
03012 the_vdev) == -1)
03013 ACE_ERROR_RETURN ((LM_ERROR,
03014 "TAO_MMDevice::create_A_B (%P|%t) - "
03015 "error in create_B\n"),
03016 0);
03017 sep = sep_b;
03018 }
03019 break;
03020 default:
03021 break;
03022 }
03023 if (this->fdev_map_.current_size () > 0)
03024 {
03025 TAO_AV_QoS qos (the_qos);
03026
03027 for (u_int i=0;i<flow_spec.length ();i++)
03028 {
03029 TAO_Forward_FlowSpec_Entry forward_entry;
03030 forward_entry.parse (flow_spec[i]);
03031 ACE_CString flow_key (forward_entry.flowname ());
03032 AVStreams::FDev_var flow_dev;
03033 AVStreams::FlowConnection_var flowconnection;
03034 try
03035 {
03036
03037
03038 CORBA::Object_var flowconnection_obj =
03039 streamctrl->get_flow_connection (forward_entry.flowname ());
03040 ACE_OS::printf("successfully called get_flow_connection\n");
03041 if (!CORBA::is_nil (flowconnection_obj.in ()))
03042 {
03043 flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ());
03044 }
03045 }
03046 catch (const AVStreams::noSuchFlow&)
03047 {
03048 TAO_FlowConnection *flowConnection;
03049 ACE_NEW_RETURN (flowConnection,
03050 TAO_FlowConnection,
03051 0);
03052 flowconnection = flowConnection->_this ();
03053 streamctrl->set_flow_connection (forward_entry.flowname(),
03054 flowconnection.in ());
03055 }
03056 catch (const CORBA::Exception& ex)
03057 {
03058
03059 ex._tao_print_exception (
03060 "TAO_MMDevice::create_a::get_flow_connection");
03061 }
03062
03063 int result = this->fdev_map_.find (flow_key, flow_dev);
03064 if (result < 0)
03065 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03066
03067 CORBA::String_var named_fdev;
03068 AVStreams::FlowEndPoint_var flow_endpoint;
03069 AVStreams::QoS flow_qos;
03070 result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03071 if (result < 0)
03072 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03073 switch (forward_entry.direction ())
03074 {
03075 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03076 {
03077 switch (type)
03078 {
03079 case MMDEVICE_A:
03080 {
03081
03082
03083
03084 flow_endpoint =
03085 flow_dev->create_producer (flowconnection.in (),
03086 flow_qos,
03087 met_qos,
03088 named_fdev.inout ());
03089 }
03090 break;
03091 case MMDEVICE_B:
03092 {
03093 flow_endpoint =
03094 flow_dev->create_consumer (flowconnection.in (),
03095 flow_qos,
03096 met_qos,
03097 named_fdev.inout ());
03098 }
03099 break;
03100 }
03101 }
03102 break;
03103 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03104 {
03105 switch (type)
03106 {
03107 case MMDEVICE_A:
03108 {
03109
03110
03111
03112 flow_endpoint =
03113 flow_dev->create_consumer (flowconnection.in (),
03114 flow_qos,
03115 met_qos,
03116 named_fdev.inout ());
03117 }
03118 break;
03119 case MMDEVICE_B:
03120 {
03121
03122
03123
03124 flow_endpoint =
03125 flow_dev->create_producer (flowconnection.in (),
03126 flow_qos,
03127 met_qos,
03128 named_fdev.inout ());
03129 }
03130 break;
03131 }
03132 }
03133 break;
03134 default:
03135 break;
03136 }
03137 CORBA::Any flowname_any;
03138 flowname_any <<= forward_entry.flowname ();
03139 flow_endpoint->define_property ("FlowName", flowname_any);
03140 sep->add_fep (flow_endpoint.in ());
03141 }
03142 }
03143 }
03144 catch (const CORBA::Exception& ex)
03145 {
03146 ex._tao_print_exception ("TAO_MMDevice::create_A");
03147 return sep;
03148 }
03149 return sep;
03150 }
03151
03152 AVStreams::StreamEndPoint_A_ptr
03153 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03154 AVStreams::VDev_out the_vdev,
03155 AVStreams::streamQoS &stream_qos,
03156 CORBA::Boolean_out met_qos,
03157 char *&named_vdev,
03158 const AVStreams::flowSpec &flow_spec)
03159 {
03160 AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03161 AVStreams::StreamEndPoint_var sep;
03162 try
03163 {
03164 sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03165 sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in());
03166
03167 ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03168 }
03169 catch (const CORBA::Exception& ex)
03170 {
03171 ex._tao_print_exception ("TAO_MMDevice::create_A");
03172 return sep_a;
03173 }
03174
03175 return sep_a;
03176 }
03177
03178
03179 AVStreams::StreamEndPoint_B_ptr
03180 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03181 AVStreams::VDev_out the_vdev,
03182 AVStreams::streamQoS &stream_qos,
03183 CORBA::Boolean_out met_qos,
03184 char *&named_vdev,
03185 const AVStreams::flowSpec &flow_spec)
03186 {
03187 AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03188 AVStreams::StreamEndPoint_var sep;
03189
03190 try
03191 {
03192 sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03193 sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in());
03194
03195 ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03196 }
03197 catch (const CORBA::Exception& ex)
03198 {
03199 ex._tao_print_exception ("TAO_MMDevice::create_B");
03200 return sep_b;
03201 }
03202 return sep_b;
03203 }
03204
03205
03206
03207 void
03208 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr ,
03209 const char * )
03210 {
03211
03212
03213
03214 int result = TAO_AV_Core::deactivate_servant (this);
03215 if (result < 0)
03216 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03217 }
03218
03219 char *
03220 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev)
03221 {
03222 char* tmp;
03223 ACE_NEW_RETURN (tmp,
03224 char[64],
03225 0);
03226 CORBA::String_var flow_name = tmp;
03227
03228 try
03229 {
03230
03231
03232 ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03233 CORBA::Any flowname_any;
03234 flowname_any <<= flow_name.in ();
03235 fdev->define_property ("Flow", flowname_any);
03236 }
03237 catch (const CORBA::Exception& ex)
03238 {
03239 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03240 return 0;
03241 }
03242 return flow_name._retn ();
03243 }
03244
03245
03246 char *
03247 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj)
03248 {
03249 CORBA::String_var flow_name;
03250 AVStreams::FDev_var fdev;
03251 try
03252 {
03253 CORBA::Any_ptr flow_name_any;
03254 fdev = AVStreams::FDev::_narrow (fdev_obj);
03255
03256 if (CORBA::is_nil (fdev.in ()))
03257 return 0;
03258
03259
03260 flow_name_any = fdev->get_property_value ("Flow");
03261
03262 const char *tmp = 0;
03263 *flow_name_any >>= tmp;
03264 flow_name = CORBA::string_dup (tmp);
03265 }
03266 catch (const CORBA::Exception&)
03267 {
03268 flow_name =
03269 this->add_fdev_i (fdev.in ());
03270 }
03271
03272
03273
03274
03275 ACE_CString fdev_name_key ( flow_name.in () );
03276
03277
03278 if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03279 throw AVStreams::streamOpFailed ();
03280
03281 this->flow_count_++;
03282 this->flows_.length (this->flow_count_);
03283 this->flows_ [this->flow_count_-1] = flow_name;
03284
03285 CORBA::Any flows_any;
03286 flows_any <<= this->flows_;
03287 try
03288 {
03289 this->define_property ("Flows",
03290 flows_any);
03291 }
03292 catch (const CORBA::Exception& ex)
03293 {
03294 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03295 return 0;
03296 }
03297 return flow_name._retn ();
03298 }
03299
03300
03301 CORBA::Object_ptr
03302 TAO_MMDevice::get_fdev (const char *flow_name)
03303 {
03304
03305 ACE_CString fdev_name_key (flow_name);
03306 AVStreams::FDev_var fdev_entry;
03307 if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03308 return fdev_entry._retn() ;
03309 return 0;
03310 }
03311
03312
03313 void
03314 TAO_MMDevice::remove_fdev (const char *flow_name)
03315 {
03316 try
03317 {
03318 ACE_CString fdev_name_key (flow_name);
03319 AVStreams::FDev_var fdev_entry;
03320
03321 if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03322 throw AVStreams::streamOpFailed ();
03323
03324 AVStreams::flowSpec new_flows (this->flows_.length ());
03325 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03326 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03327 new_flows[j++] = this->flows_[i];
03328
03329 CORBA::Any flows;
03330 flows <<= new_flows;
03331 this->flows_ = new_flows;
03332 this->define_property ("Flows",
03333 flows);
03334 }
03335 catch (const CORBA::Exception& ex)
03336 {
03337 ex._tao_print_exception ("TAO_MMDevice::remove_fdev");
03338 }
03339 }
03340
03341
03342 TAO_MMDevice::~TAO_MMDevice (void)
03343 {
03344 delete this->stream_ctrl_;
03345 }
03346
03347
03348
03349
03350
03351
03352 TAO_FlowConnection::TAO_FlowConnection (void)
03353 :fp_name_ (CORBA::string_dup ("")),
03354 ip_multicast_ (0)
03355 {
03356 }
03357
03358
03359
03360
03361
03362
03363
03364
03365
03366 int
03367 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03368 {
03369 this->mcast_addr_ = mcast_addr;
03370 this->mcast_port_ = mcast_port;
03371 return 0;
03372 }
03373
03374 void
03375 TAO_FlowConnection::set_protocol (const char *protocol)
03376 {
03377 this->protocol_ = protocol;
03378 }
03379
03380
03381 void
03382 TAO_FlowConnection::stop (void)
03383 {
03384 try
03385 {
03386 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03387 ();
03388 for (FlowProducer_SetItor producer_end =
03389 this->flow_producer_set_.end ();
03390 producer_begin != producer_end; ++producer_begin)
03391 {
03392 (*producer_begin)->stop ();
03393 }
03394 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03395 ();
03396 for (FlowConsumer_SetItor consumer_end =
03397 this->flow_consumer_set_.end ();
03398 consumer_begin != consumer_end; ++consumer_begin)
03399 {
03400 (*consumer_begin)->stop ();
03401 }
03402 }
03403 catch (const AVStreams::noSuchFlow&)
03404 {
03405 throw;
03406 }
03407 catch (const CORBA::Exception& ex)
03408 {
03409 ex._tao_print_exception ("TAO_FlowConnection::stop");
03410 throw;
03411 }
03412 catch(...)
03413 {
03414 printf ("TAO_FlowConnection::stop - unknown exception\n");
03415 }
03416 }
03417
03418
03419 void
03420 TAO_FlowConnection::start (void)
03421 {
03422 try
03423 {
03424 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03425 ();
03426 for (FlowConsumer_SetItor consumer_end =
03427 this->flow_consumer_set_.end ();
03428 consumer_begin != consumer_end; ++consumer_begin)
03429 {
03430 (*consumer_begin)->start ();
03431 }
03432 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03433 ();
03434 for (FlowProducer_SetItor producer_end =
03435 this->flow_producer_set_.end ();
03436 producer_begin != producer_end; ++producer_begin)
03437 {
03438 (*producer_begin)->start ();
03439 }
03440 }
03441 catch (const AVStreams::noSuchFlow&)
03442 {
03443 throw;
03444 }
03445 catch (const CORBA::Exception& ex)
03446 {
03447 ex._tao_print_exception ("TAO_FlowConnection::start");
03448 throw;
03449 }
03450 catch(...)
03451 {
03452 printf ("TAO_FlowConnection::start - unknown exception\n");
03453 }
03454 }
03455
03456
03457 void
03458 TAO_FlowConnection::destroy (void)
03459 {
03460 try
03461 {
03462 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03463 ();
03464 for (FlowProducer_SetItor producer_end =
03465 this->flow_producer_set_.end ();
03466 producer_begin != producer_end; ++producer_begin)
03467 {
03468 (*producer_begin)->destroy ();
03469 }
03470 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03471 ();
03472 for (FlowConsumer_SetItor consumer_end =
03473 this->flow_consumer_set_.end ();
03474 consumer_begin != consumer_end; ++consumer_begin)
03475 {
03476 (*consumer_begin)->destroy ();
03477 }
03478 }
03479 catch (const CORBA::Exception& ex)
03480 {
03481 ex._tao_print_exception ("TAO_FlowConnection::destroy");
03482 return;
03483 }
03484 int result = TAO_AV_Core::deactivate_servant (this);
03485 if (result < 0)
03486 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03487 }
03488
03489
03490 CORBA::Boolean
03491 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos)
03492 {
03493 ACE_UNUSED_ARG (new_qos);
03494 return 0;
03495 }
03496
03497
03498 CORBA::Boolean
03499 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
03500 const CORBA::Any & fp_settings)
03501 {
03502 this->fp_name_ = fp_name;
03503 this->fp_settings_ = fp_settings;
03504 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03505 ();
03506 for (FlowProducer_SetItor producer_end =
03507 this->flow_producer_set_.end ();
03508 producer_begin != producer_end; ++producer_begin)
03509 {
03510 (*producer_begin)->use_flow_protocol
03511 (fp_name, fp_settings);
03512 }
03513 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03514 ();
03515 for (FlowConsumer_SetItor consumer_end =
03516 this->flow_consumer_set_.end ();
03517 consumer_begin != consumer_end; ++consumer_begin)
03518 {
03519 (*consumer_begin)->use_flow_protocol
03520 (fp_name, fp_settings);
03521 }
03522 return 1;
03523 }
03524
03525 void
03526 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event)
03527 {
03528 ACE_UNUSED_ARG (the_event);
03529 }
03530
03531 CORBA::Boolean
03532 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
03533 AVStreams::FDev_ptr b_party,
03534 AVStreams::QoS & flow_qos)
03535 {
03536 CORBA::Boolean result = 0;
03537 try
03538 {
03539 AVStreams::FlowConnection_var flowconnection = this->_this ();
03540 CORBA::Boolean met_qos;
03541 CORBA::String_var named_fdev ((const char *)"");
03542 AVStreams::FlowProducer_var producer =
03543 a_party->create_producer (flowconnection.in (),
03544 flow_qos,
03545 met_qos,
03546 named_fdev.inout ());
03547 AVStreams::FlowConsumer_var consumer =
03548 b_party->create_consumer (flowconnection.in (),
03549 flow_qos,
03550 met_qos,
03551 named_fdev.inout ());
03552 result = this->connect (producer.in (),
03553 consumer.in (),
03554 flow_qos);
03555 }
03556 catch (const CORBA::Exception& ex)
03557 {
03558 ex._tao_print_exception (
03559 "TAO_FlowConnection::connect_devs");
03560 return 0;
03561 }
03562 return result;
03563 }
03564
03565
03566 CORBA::Boolean
03567 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
03568 AVStreams::FlowConsumer_ptr consumer,
03569 AVStreams::QoS & the_qos)
03570 {
03571 try
03572 {
03573
03574 AVStreams::FlowProducer_ptr flow_producer =
03575 AVStreams::FlowProducer::_duplicate (producer);
03576 AVStreams::FlowConsumer_ptr flow_consumer =
03577 AVStreams::FlowConsumer::_duplicate (consumer);
03578
03579 this->flow_producer_set_.insert (flow_producer);
03580 this->flow_consumer_set_.insert (flow_consumer);
03581 AVStreams::FlowConnection_var flowconnection =
03582 this->_this ();
03583
03584 flow_producer->set_peer (flowconnection.in (),
03585 flow_consumer,
03586 the_qos);
03587
03588 flow_consumer->set_peer (flowconnection.in (),
03589 flow_producer,
03590 the_qos);
03591
03592 char *consumer_address =
03593 flow_consumer->go_to_listen (the_qos,
03594 0,
03595 flow_producer,
03596 this->fp_name_.inout ());
03597
03598 if (ACE_OS::strcmp (consumer_address, "") == 0)
03599 {
03600
03601 consumer_address = flow_producer->go_to_listen (the_qos,
03602 0,
03603 flow_consumer,
03604 this->fp_name_.inout ());
03605 flow_consumer->connect_to_peer (the_qos,
03606 consumer_address,
03607 this->fp_name_.inout ());
03608
03609
03610 }
03611 else
03612 {
03613 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03614 flow_producer->connect_to_peer (the_qos,
03615 consumer_address,
03616 this->fp_name_.inout ());
03617 }
03618 }
03619 catch (const CORBA::Exception& ex)
03620 {
03621 ex._tao_print_exception ("TAO_FlowConnection::connect");
03622 }
03623 return 1;
03624 }
03625
03626
03627 CORBA::Boolean
03628 TAO_FlowConnection::disconnect (void)
03629 {
03630 return 0;
03631 }
03632
03633 CORBA::Boolean
03634 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
03635 AVStreams::QoS & the_qos)
03636 {
03637 try
03638 {
03639 AVStreams::FlowProducer_ptr flow_producer =
03640 AVStreams::FlowProducer::_duplicate (producer);
03641
03642
03643
03644
03645
03646 FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03647 FlowProducer_SetItor end = this->flow_producer_set_.end ();
03648 for (; begin != end; ++begin)
03649 {
03650 if ((*begin)->_is_equivalent (producer))
03651
03652 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03653 }
03654
03655
03656
03657
03658 int result = this->flow_producer_set_.insert (flow_producer);
03659 if (result == 1)
03660 {
03661
03662 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03663 }
03664 CORBA::Boolean met_qos;
03665 char mcast_address[BUFSIZ];
03666 if (this->producer_address_.in () == 0)
03667 {
03668 ACE_INET_Addr mcast_addr;
03669 mcast_addr.set (this->mcast_port_,
03670 this->mcast_addr_.c_str ()
03671 );
03672
03673 ACE_TCHAR buf [BUFSIZ];
03674 mcast_addr.addr_to_string (buf, BUFSIZ);
03675 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03676 }
03677 else
03678 {
03679 ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03680 }
03681 char *address = flow_producer->connect_mcast (the_qos,
03682 met_qos,
03683 mcast_address,
03684 this->fp_name_.in ());
03685
03686 if (this->producer_address_.in () == 0)
03687 {
03688 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03689 if (entry.address () != 0)
03690 {
03691
03692 this->producer_address_ = address;
03693 }
03694 else
03695 {
03696
03697 this->ip_multicast_ = 0;
03698 }
03699 }
03700
03701 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03702 {
03703 ACE_NEW_RETURN (this->mcastconfigif_i_,
03704 TAO_MCastConfigIf,
03705 0);
03706 this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03707 }
03708 AVStreams::FlowConnection_var flowconnection = this->_this ();
03709 flow_producer->set_Mcast_peer (flowconnection.in (),
03710 this->mcastconfigif_.in (),
03711 the_qos);
03712 }
03713 catch (const CORBA::Exception& ex)
03714 {
03715 ex._tao_print_exception (
03716 "TAO_FlowConnection::add_producer");
03717 return 0;
03718 }
03719 return 1;
03720 }
03721
03722 CORBA::Boolean
03723 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
03724 AVStreams::QoS & the_qos)
03725 {
03726 try
03727 {
03728 AVStreams::FlowConsumer_ptr flow_consumer =
03729 AVStreams::FlowConsumer::_duplicate (consumer);
03730 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03731 FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03732 for (; begin != end; ++begin)
03733 {
03734 if ((*begin)->_is_equivalent (consumer))
03735
03736 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03737 }
03738 int result = this->flow_consumer_set_.insert (flow_consumer);
03739 if (result == 1)
03740 {
03741
03742 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03743 }
03744
03745 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03746
03747
03748
03749
03750 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03751
03752 AVStreams::protocolSpec protocols (1);
03753 protocols.length (1);
03754 protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03755
03756 if (!this->ip_multicast_)
03757 {
03758 flow_consumer->set_protocol_restriction (protocols);
03759 char * address =
03760 flow_consumer->go_to_listen (the_qos,
03761 1,
03762 flow_producer,
03763 this->fp_name_.inout ());
03764 CORBA::Boolean is_met;
03765 flow_producer->connect_mcast (the_qos,
03766 is_met,
03767 address,
03768 this->fp_name_.inout ());
03769 }
03770 else
03771 {
03772
03773
03774
03775
03776
03777 flow_consumer->connect_to_peer (the_qos,
03778 this->producer_address_.in (),
03779 this->fp_name_.inout ());
03780
03781
03782
03783
03784
03785
03786
03787
03788 }
03789 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03790 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03791
03792 AVStreams::flowSpec flow_spec;
03793 AVStreams::streamQoS stream_qos (1);
03794 stream_qos.length (1);
03795 stream_qos [0] = the_qos;
03796 this->mcastconfigif_->set_peer (flow_consumer,
03797 stream_qos,
03798 flow_spec);
03799 }
03800 catch (const CORBA::Exception& ex)
03801 {
03802 ex._tao_print_exception (
03803 "TAO_FlowConnection::add_consumer");
03804 return 0;
03805 }
03806 return 1;
03807 }
03808
03809 CORBA::Boolean
03810 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target)
03811 {
03812 ACE_UNUSED_ARG (target);
03813 return 0;
03814 }
03815
03816
03817
03818
03819
03820
03821 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
03822 :lock_ (0)
03823 {
03824 }
03825
03826 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
03827 AVStreams::protocolSpec &protocols,
03828 const char *format)
03829 {
03830 this->open (flowname, protocols, format);
03831 }
03832
03833 void
03834 TAO_FlowEndPoint::set_flow_handler (const char * ,
03835 TAO_AV_Flow_Handler * )
03836 {
03837 }
03838
03839 int
03840 TAO_FlowEndPoint::open (const char *flowname,
03841 AVStreams::protocolSpec &protocols,
03842 const char *format)
03843 {
03844 this->flowname_ = flowname;
03845 this->format_ = format;
03846
03847 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
03848 try
03849 {
03850 CORBA::Any flowname_any;
03851 flowname_any <<= flowname;
03852 this->define_property ("FlowName",
03853 flowname_any);
03854 this->set_format (format);
03855 this->protocol_addresses_ = protocols;
03856 AVStreams::protocolSpec protocol_spec (protocols.length ());
03857 protocol_spec.length (protocols.length ());
03858 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03859 for (u_int i=0;i<protocols.length ();i++)
03860 {
03861 CORBA::String_var address = CORBA::string_dup (protocols [i]);
03862 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
03863 protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
03864 if (TAO_debug_level > 0)
03865 ACE_DEBUG ((LM_DEBUG,
03866 "[%s]\n",
03867 static_cast<char const*>(protocol_spec[i])));
03868 }
03869 this->set_protocol_restriction (protocol_spec);
03870 }
03871 catch (const CORBA::Exception& ex)
03872 {
03873 ex._tao_print_exception ("TAO_FlowEndPoint::open");
03874 return -1;
03875 }
03876 return 0;
03877 }
03878
03879
03880 int
03881 TAO_FlowEndPoint::set_flowname (const char *flowname)
03882 {
03883 this->flowname_ = flowname;
03884 return 0;
03885 }
03886
03887
03888
03889 CORBA::Boolean
03890 TAO_FlowEndPoint::lock (void)
03891 {
03892
03893
03894 if (this->lock_)
03895 return 0;
03896 this->lock_ = 1;
03897 return 1;
03898 }
03899
03900
03901 void
03902 TAO_FlowEndPoint::unlock (void)
03903 {
03904 this->lock_ = 0;
03905 }
03906
03907
03908 void
03909 TAO_FlowEndPoint::destroy (void)
03910 {
03911 int result = TAO_AV_Core::deactivate_servant (this);
03912 if (result < 0)
03913 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
03914 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
03915 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
03916 begin != end; ++begin)
03917 (*begin)->protocol_object ()->destroy ();
03918 }
03919
03920 AVStreams::StreamEndPoint_ptr
03921 TAO_FlowEndPoint::related_sep (void)
03922 {
03923
03924 return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
03925 }
03926
03927 void
03928 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep)
03929 {
03930 this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
03931 }
03932
03933 AVStreams::FlowConnection_ptr
03934 TAO_FlowEndPoint::related_flow_connection (void)
03935 {
03936 return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
03937 }
03938
03939 void
03940 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection)
03941 {
03942 this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
03943 }
03944
03945
03946 AVStreams::FlowEndPoint_ptr
03947 TAO_FlowEndPoint::get_connected_fep (void)
03948 {
03949 return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
03950 }
03951
03952 CORBA::Boolean
03953 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
03954 const CORBA::Any &)
03955 {
03956 try
03957 {
03958
03959 CORBA::Any flowname_property;
03960 flowname_property <<= fp_name;
03961 this->define_property ("FlowProtocol",
03962 flowname_property);
03963 }
03964 catch (const CORBA::Exception& ex)
03965 {
03966 ex._tao_print_exception (
03967 "TAO_FlowEndPoint::use_flow_protocol");
03968 return 0;
03969 }
03970 return 1;
03971 }
03972
03973 void
03974 TAO_FlowEndPoint::set_format (const char * format)
03975 {
03976 this->format_ = format;
03977 try
03978 {
03979
03980
03981 CORBA::Any format_val;
03982 format_val <<= format;
03983 this->define_property ("Format",
03984 format_val);
03985 }
03986 catch (const CORBA::Exception& ex)
03987 {
03988 ex._tao_print_exception ("TAO_FlowEndpoint::set_format");
03989 }
03990 }
03991
03992 void
03993 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings)
03994 {
03995 this->dev_params_ = new_settings;
03996 try
03997 {
03998 CORBA::Any DevParams_property;
03999 DevParams_property <<= new_settings;
04000 this->define_property ("DevParams",
04001 DevParams_property);
04002 }
04003 catch (const CORBA::Exception& ex)
04004 {
04005 ex._tao_print_exception (
04006 "TAO_FlowEndPoint::set_dev_params");
04007 }
04008 }
04009
04010 void
04011 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols)
04012 {
04013 try
04014 {
04015 u_int i = 0;
04016 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04017 for (i=0;i<protocols.length ();i++)
04018 {
04019 const char *protocol = (protocols)[i];
04020 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04021 }
04022 CORBA::Any AvailableProtocols_property;
04023 AvailableProtocols_property <<= protocols;
04024 this->define_property ("AvailableProtocols",
04025 AvailableProtocols_property);
04026 AVStreams::protocolSpec *temp_spec = 0;
04027 CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols");
04028 temp_any.in () >>= temp_spec;
04029 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04030 for (i=0;i<temp_spec->length ();i++)
04031 {
04032 const char *protocol = (*temp_spec)[i];
04033 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04034 }
04035 this->protocols_ = protocols;
04036 }
04037 catch (const CORBA::Exception& ex)
04038 {
04039 ex._tao_print_exception (
04040 "TAO_FlowEndpoint::set_protocol_restriction");
04041 }
04042 }
04043
04044 CORBA::Boolean
04045 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep)
04046 {
04047 const char *exception_message = "";
04048 try
04049 {
04050
04051
04052
04053 CORBA::Any_var format_ptr;
04054 CORBA::String_var my_format, peer_format;
04055
04056 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04057 format_ptr = this->get_property_value ("Format");
04058
04059 const char *temp_format = 0;
04060 format_ptr.in () >>= temp_format;
04061 my_format = CORBA::string_dup (temp_format);
04062
04063 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04064 format_ptr = peer_fep->get_property_value ("Format");
04065 format_ptr.in () >>= temp_format;
04066 peer_format = CORBA::string_dup (temp_format);
04067 if (ACE_OS::strcmp (my_format.in (),
04068 peer_format.in ()) != 0)
04069 return 0;
04070
04071
04072 CORBA::Any_var AvailableProtocols_ptr;
04073 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04074 AVStreams::protocolSpec *temp_protocols = 0;
04075
04076 exception_message =
04077 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04078 AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols");
04079 AvailableProtocols_ptr.in () >>= temp_protocols;
04080 my_protocol_spec = *temp_protocols;
04081
04082 exception_message =
04083 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04084 AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols");
04085 AvailableProtocols_ptr.in () >>= temp_protocols;
04086 peer_protocol_spec = *temp_protocols;
04087
04088 int protocol_match = 0;
04089 for (u_int i=0;i<my_protocol_spec.length ();i++)
04090 {
04091 CORBA::String_var my_protocol_string;
04092 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04093 {
04094 CORBA::String_var peer_protocol_string;
04095 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04096 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04097 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04098 {
04099 protocol_match = 1;
04100 break;
04101 }
04102 }
04103 if (protocol_match)
04104 break;
04105 }
04106 if (!protocol_match)
04107 return 0;
04108 }
04109 catch (const CosPropertyService::PropertyNotFound& nf)
04110 {
04111 nf._tao_print_exception (exception_message);
04112 }
04113 catch (const CORBA::Exception& ex)
04114 {
04115 ex._tao_print_exception ("TAO_FlowEndPoint::is_fep_compatible");
04116 return 0;
04117 }
04118 return 1;
04119 }
04120
04121 CORBA::Boolean
04122 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr ,
04123 AVStreams::FlowEndPoint_ptr the_peer_fep,
04124 AVStreams::QoS & )
04125 {
04126 this->peer_fep_ =
04127 AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04128 return 1;
04129 }
04130
04131 CORBA::Boolean
04132 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr ,
04133 AVStreams::MCastConfigIf_ptr mcast_peer,
04134 AVStreams::QoS & )
04135 {
04136 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04137 return 0;
04138 }
04139
04140 char *
04141 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04142 AVStreams::QoS & ,
04143 CORBA::Boolean ,
04144 AVStreams::FlowEndPoint_ptr peer_fep,
04145 char *& flowProtocol)
04146 {
04147 char direction [BUFSIZ];
04148 switch (role)
04149 {
04150 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04151 ACE_OS::strcpy (direction, "IN");
04152 break;
04153 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04154 ACE_OS::strcpy (direction, "OUT");
04155 break;
04156 default:
04157 break;
04158 }
04159 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04160 AVStreams::protocolSpec *temp_protocols = 0;
04161 CORBA::Any_var AvailableProtocols_ptr =
04162 peer_fep->get_property_value ("AvailableProtocols");
04163 AvailableProtocols_ptr.in () >>= temp_protocols;
04164 peer_protocol_spec = *temp_protocols;
04165 AvailableProtocols_ptr =
04166 this->get_property_value ("AvailableProtocols");
04167 AvailableProtocols_ptr.in () >>= temp_protocols;
04168 my_protocol_spec = *temp_protocols;
04169 int protocol_match = 0;
04170 CORBA::String_var listen_protocol;
04171 u_int i =0;
04172 for (i=0;i<my_protocol_spec.length ();i++)
04173 {
04174 CORBA::String_var my_protocol_string;
04175 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04176 {
04177 CORBA::String_var peer_protocol_string;
04178 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04179 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04180 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04181 {
04182 listen_protocol = my_protocol_string;
04183 protocol_match = 1;
04184 break;
04185 }
04186 }
04187 if (protocol_match)
04188 break;
04189 }
04190 if (!protocol_match)
04191 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04192
04193 for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04194 if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04195 {
04196
04197 TAO_Forward_FlowSpec_Entry *entry;
04198 ACE_NEW_RETURN (entry,
04199 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04200 direction,
04201 this->format_.in (),
04202 flowProtocol,
04203 this->protocol_addresses_ [j]),
04204 0);
04205
04206 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04207 this->flow_spec_set_.insert (entry);
04208 int result = acceptor_registry->open (this,
04209 TAO_AV_CORE::instance (),
04210 this->flow_spec_set_);
04211 if (result < 0)
04212 return 0;
04213 char *listen_address = entry->get_local_addr_str ();
04214 char *address;
04215 ACE_NEW_RETURN (address,
04216 char [BUFSIZ],
04217 0);
04218 ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04219 return address;
04220 }
04221 return 0;
04222 }
04223
04224
04225 CORBA::Boolean
04226 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04227 AVStreams::QoS & ,
04228 const char * address,
04229 const char * use_flow_protocol)
04230 {
04231 char direction [BUFSIZ];
04232 switch (role)
04233 {
04234 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04235 ACE_OS::strcpy (direction, "IN");
04236 break;
04237 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04238 ACE_OS::strcpy (direction, "OUT");
04239 break;
04240 default:
04241 break;
04242 }
04243 TAO_Forward_FlowSpec_Entry *entry;
04244 ACE_NEW_RETURN (entry,
04245 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04246 direction,
04247 this->format_.in (),
04248 use_flow_protocol,
04249 address),
04250 0);
04251 this->flow_spec_set_.insert (entry);
04252 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04253 int result = connector_registry->open (this,
04254 TAO_AV_CORE::instance (),
04255 this->flow_spec_set_);
04256 if (result < 0)
04257 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04258 this->reverse_channel_ = entry->get_local_addr_str ();
04259 return 1;
04260 }
04261
04262 int
04263 TAO_FlowEndPoint::set_protocol_object (const char * ,
04264 TAO_AV_Protocol_Object * )
04265 {
04266 return 0;
04267 }
04268
04269
04270
04271
04272
04273
04274
04275 TAO_FlowProducer::TAO_FlowProducer (void)
04276 {
04277 }
04278
04279 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04280 AVStreams::protocolSpec protocols,
04281 const char *format)
04282 {
04283 this->open (flowname, protocols, format);
04284 }
04285
04286
04287 char *
04288 TAO_FlowProducer::get_rev_channel (const char * )
04289 {
04290 return 0;
04291 }
04292
04293
04294 void
04295 TAO_FlowProducer::stop (void)
04296 {
04297 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04298 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04299 begin != end; ++begin)
04300 {
04301 TAO_FlowSpec_Entry *entry = (*begin);
04302 entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04303 }
04304 }
04305
04306 void
04307 TAO_FlowProducer::start (void)
04308 {
04309 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04310 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04311 begin != end; ++begin)
04312 {
04313 TAO_FlowSpec_Entry *entry = (*begin);
04314 if (entry->handler () != 0)
04315 {
04316 entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04317 }
04318 if (entry->control_handler () != 0)
04319 {
04320 entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04321 }
04322 }
04323 }
04324
04325 char *
04326 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
04327 CORBA::Boolean is_mcast,
04328 AVStreams::FlowEndPoint_ptr peer_fep,
04329 char *& flowProtocol)
04330 {
04331 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04332 the_qos,
04333 is_mcast,
04334 peer_fep,
04335 flowProtocol);
04336 }
04337
04338 CORBA::Boolean
04339 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
04340 const char * address,
04341 const char * use_flow_protocol)
04342 {
04343 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04344 the_qos,
04345 address,
04346 use_flow_protocol);
04347 }
04348
04349 char *
04350 TAO_FlowProducer::connect_mcast (AVStreams::QoS & ,
04351 CORBA::Boolean_out ,
04352 const char *address,
04353 const char * use_flow_protocol)
04354 {
04355
04356 for (u_int i=0;i<this->protocols_.length ();i++)
04357 {
04358
04359 }
04360
04361 if (address == 0)
04362 if (TAO_debug_level > 0)
04363 ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
04364 TAO_Forward_FlowSpec_Entry *entry;
04365 ACE_NEW_RETURN (entry,
04366 TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
04367 "IN",
04368 this->format_.in (),
04369 use_flow_protocol,
04370 address),
04371 0);
04372
04373 this->flow_spec_set_.insert (entry);
04374 TAO_AV_Acceptor_Registry *acceptor_registry =
04375 TAO_AV_CORE::instance ()->acceptor_registry ();
04376 int result = acceptor_registry->open (this,
04377 TAO_AV_CORE::instance (),
04378 this->flow_spec_set_);
04379 if (result < 0)
04380 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
04381
04382
04383 ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
04384 event_handler->reactor ()->remove_handler (event_handler,
04385 ACE_Event_Handler::READ_MASK);
04386 return CORBA::string_dup (address);
04387 }
04388
04389
04390 void
04391 TAO_FlowProducer::set_key (const AVStreams::key & the_key)
04392 {
04393 try
04394 {
04395 CORBA::Any anyval;
04396 anyval <<= the_key;
04397 this->define_property ("PublicKey",
04398 anyval);
04399 }
04400 catch (const CORBA::Exception& ex)
04401 {
04402 ex._tao_print_exception ("TAO_FlowProducer::set_key");
04403 }
04404 }
04405
04406
04407 void
04408 TAO_FlowProducer::set_source_id (CORBA::Long source_id)
04409 {
04410 this->source_id_ = source_id;
04411 }
04412
04413
04414
04415
04416
04417
04418
04419 TAO_FlowConsumer::TAO_FlowConsumer (void)
04420 {
04421 }
04422
04423 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
04424 AVStreams::protocolSpec protocols,
04425 const char *format)
04426 {
04427 this->open (flowname, protocols, format);
04428 }
04429
04430
04431 void
04432 TAO_FlowConsumer::stop (void)
04433 {
04434 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04435 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04436 begin != end; ++begin)
04437 (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04438 }
04439
04440 void
04441 TAO_FlowConsumer::start (void)
04442 {
04443 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04444 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04445 begin != end; ++begin)
04446 {
04447 (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04448 }
04449 }
04450
04451 char *
04452 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
04453 CORBA::Boolean is_mcast,
04454 AVStreams::FlowEndPoint_ptr peer_fep,
04455 char *& flowProtocol)
04456 {
04457 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04458 the_qos,
04459 is_mcast,
04460 peer_fep,
04461 flowProtocol);
04462 }
04463
04464 CORBA::Boolean
04465 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
04466 const char * address,
04467 const char * use_flow_protocol)
04468 {
04469 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04470 the_qos,
04471 address,
04472 use_flow_protocol);
04473 }
04474
04475
04476
04477
04478 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
04479 :token_array_ (10, (char*)0, 0),
04480 count_ (0)
04481 {
04482 this->parse (string, delimiter);
04483 }
04484
04485 TAO_Tokenizer::~TAO_Tokenizer ()
04486 {
04487 for (unsigned int i=0; i<this->num_tokens_; i++)
04488 CORBA::string_free (this->token_array_[i]);
04489 }
04490
04491
04492 int
04493 TAO_Tokenizer::parse (const char *string, char delimiter)
04494 {
04495 ACE_CString new_string (string);
04496 u_int pos =0;
04497 ACE_CString::size_type slash_pos = 0;
04498 u_int count = 0;
04499 int result;
04500 while (pos < new_string.length ())
04501 {
04502 slash_pos = new_string.find (delimiter, pos);
04503 ACE_CString substring;
04504 if (slash_pos != new_string.npos)
04505 {
04506 substring = new_string.substring (pos,
04507 slash_pos - pos);
04508 pos = slash_pos + 1;
04509 }
04510 else
04511 {
04512 substring = new_string.substring (pos);
04513 pos = static_cast<int> (new_string.length ());
04514 }
04515 char *token = CORBA::string_dup (substring.c_str ());
04516 result = this->token_array_.set (token, count);
04517 if (result == -1)
04518 {
04519 this->token_array_.size (this->token_array_.size ()*2);
04520 result = this->token_array_.set (token, count);
04521 if (result == -1)
04522 ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04523 }
04524 count++;
04525 }
04526
04527
04528
04529
04530
04531
04532
04533
04534
04535
04536
04537
04538
04539
04540
04541
04542
04543
04544
04545
04546
04547 this->num_tokens_ = count;
04548 return 0;
04549 }
04550
04551 char*
04552 TAO_Tokenizer::token (void)
04553 {
04554 if (count_ < num_tokens_)
04555 return CORBA::string_dup (this->token_array_[this->count_++]);
04556 else
04557 return 0;
04558 }
04559
04560 int
04561 TAO_Tokenizer::num_tokens (void)
04562 {
04563 return static_cast<int> (this->num_tokens_);
04564 }
04565
04566 const char *
04567 TAO_Tokenizer::operator [] (size_t index) const
04568 {
04569 if (index >= this->num_tokens_)
04570 return 0;
04571
04572 return this->token_array_[index];
04573 }
04574
04575 TAO_END_VERSIONED_NAMESPACE_DECL