00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "orbsvcs/AV/AVStreams_i.h"
00014 #include "orbsvcs/AV/sfp.h"
00015 #include "orbsvcs/AV/MCast.h"
00016 #include "orbsvcs/AV/RTCP.h"
00017
00018 #include "tao/debug.h"
00019 #include "tao/ORB_Core.h"
00020 #include "tao/AnyTypeCode/Any.h"
00021 #include "ace/OS_NS_arpa_inet.h"
00022
00023 #if !defined (__ACE_INLINE__)
00024 #include "orbsvcs/AV/AVStreams_i.inl"
00025 #endif
00026
00027 ACE_RCSID (AV,
00028 AVStreams_i,
00029 "$Id: AVStreams_i.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00030
00031 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00032
00033
00034
00035
00036
00037 TAO_AV_QoS::TAO_AV_QoS (void)
00038 {
00039 }
00040
00041 TAO_AV_QoS::TAO_AV_QoS (AVStreams::streamQoS &stream_qos)
00042 {
00043 this->set (stream_qos);
00044 }
00045
00046 int
00047 TAO_AV_QoS::convert (AVStreams::streamQoS &)
00048 {
00049 return -1;
00050 }
00051
00052
00053
00054
00055
00056 AV_Null_MediaCtrl::AV_Null_MediaCtrl (void)
00057 {
00058 }
00059
00060 AV_Null_MediaCtrl::~AV_Null_MediaCtrl (void)
00061 {
00062 }
00063
00064
00065
00066
00067
00068
00069
00070 TAO_Basic_StreamCtrl::TAO_Basic_StreamCtrl (void)
00071 :flow_count_ (0)
00072 {
00073 }
00074
00075
00076
00077
00078 void
00079 TAO_Basic_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00080 {
00081 try
00082 {
00083
00084
00085 if (this->flow_connection_map_.current_size () > 0)
00086 {
00087 if (flow_spec.length () > 0)
00088 for (u_int i=0;i<flow_spec.length ();i++)
00089 {
00090 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00091 ACE_CString flow_name_key (flowname);
00092 AVStreams::FlowConnection_var flow_connection_entry;
00093 if (this->flow_connection_map_.find (flow_name_key,
00094 flow_connection_entry) == 0)
00095 {
00096 flow_connection_entry->stop ();
00097 }
00098 }
00099 else
00100 {
00101
00102 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00103 FlowConnection_Map_Entry *entry;
00104 for (;iterator.next (entry) != 0;iterator.advance ())
00105 {
00106 entry->int_id_->stop ();
00107 }
00108 }
00109 }
00110 }
00111 catch(AVStreams::noSuchFlow& ex)
00112 {
00113 throw;
00114 }
00115 catch (const CORBA::Exception& ex)
00116 {
00117 ex._tao_print_exception ("TAO_Basic_StreamCtrl::stop");
00118 throw;
00119 }
00120 catch(...)
00121 {
00122 printf ("TAO_Basic_StreamCtrl::stop - unknown exception\n");
00123 }
00124 }
00125
00126
00127
00128 void
00129 TAO_Basic_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00130 {
00131 try
00132 {
00133
00134
00135
00136 if (this->flow_connection_map_.current_size () > 0)
00137 {
00138 if (flow_spec.length () > 0)
00139 for (u_int i = 0; i < flow_spec.length (); i++)
00140 {
00141 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00142 ACE_CString flow_name_key (flowname);
00143 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00144 if (this->flow_connection_map_.find (flow_name_key,
00145 flow_connection_entry) == 0)
00146 {
00147 flow_connection_entry->int_id_->start ();
00148 }
00149 }
00150 else
00151 {
00152
00153 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00154 FlowConnection_Map_Entry *entry = 0;
00155 for (;iterator.next (entry) != 0;iterator.advance ())
00156 {
00157 entry->int_id_->start ();
00158 }
00159 }
00160 }
00161 }
00162 catch(AVStreams::noSuchFlow& ex)
00163 {
00164 throw;
00165 }
00166 catch (const CORBA::Exception& ex)
00167 {
00168 ex._tao_print_exception ("TAO_Basic_StreamCtrl::start");
00169 throw;
00170 }
00171 catch(...)
00172 {
00173 printf ("TAO_Basic_StreamCtrl::start - unknown exception\n");
00174 }
00175 }
00176
00177
00178
00179
00180
00181 void
00182 TAO_Basic_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00183 {
00184 try
00185 {
00186
00187 if (this->flow_connection_map_.current_size () > 0)
00188 {
00189 if (flow_spec.length () > 0)
00190 {
00191 for (u_int i=0;i<flow_spec.length ();i++)
00192 {
00193 char *flowname = TAO_AV_Core::get_flowname (flow_spec[i]);
00194 ACE_CString flow_name_key (flowname);
00195 FlowConnection_Map::ENTRY *flow_connection_entry = 0;
00196 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0)
00197 {
00198 flow_connection_entry->int_id_->destroy ();
00199 }
00200 }
00201 }
00202 else
00203 {
00204
00205 FlowConnection_Map_Iterator iterator (this->flow_connection_map_);
00206 FlowConnection_Map_Entry *entry = 0;
00207 for (;iterator.next (entry) != 0;iterator.advance ())
00208 {
00209 entry->int_id_->destroy ();
00210 }
00211 }
00212 }
00213 }
00214 catch (const CORBA::Exception& ex)
00215 {
00216 ex._tao_print_exception ("TAO_Basic_StreamCtrl::destroy");
00217 return;
00218 }
00219 }
00220
00221
00222
00223 CORBA::Boolean
00224
00225 TAO_Basic_StreamCtrl::modify_QoS (AVStreams::streamQoS & ,
00226 const AVStreams::flowSpec &)
00227 {
00228 return 1;
00229 }
00230
00231
00232
00233 void
00234 TAO_Basic_StreamCtrl::push_event (const struct CosPropertyService::Property &)
00235 {
00236 if (TAO_debug_level > 0)
00237 ACE_DEBUG ((LM_DEBUG, "\n(%P|%t) Recieved event \""));
00238 }
00239
00240
00241 void
00242 TAO_Basic_StreamCtrl::set_FPStatus (const AVStreams::flowSpec &flow_spec,
00243 const char *fp_name,
00244 const CORBA::Any &fp_settings)
00245
00246 {
00247 if (!CORBA::is_nil (this->sep_a_.in ()))
00248 {
00249 this->sep_a_->set_FPStatus (flow_spec, fp_name, fp_settings);
00250 }
00251 }
00252
00253
00254 CORBA::Object_ptr
00255 TAO_Basic_StreamCtrl::get_flow_connection (const char *flow_name)
00256 {
00257 ACE_CString flow_name_key (flow_name);
00258 AVStreams::FlowConnection_var flow_connection_entry;
00259
00260 if (this->flow_connection_map_.find (flow_name_key, flow_connection_entry) == 0){
00261 return flow_connection_entry._retn();
00262 }
00263 else{
00264 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00265 throw AVStreams::noSuchFlow ();
00266 }
00267 }
00268
00269
00270 void
00271 TAO_Basic_StreamCtrl::set_flow_connection (const char *flow_name,
00272 CORBA::Object_ptr flow_connection_obj)
00273 {
00274 AVStreams::FlowConnection_var flow_connection;
00275 try
00276 {
00277 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj);
00278 }
00279 catch (const CORBA::Exception& ex)
00280 {
00281 ex._tao_print_exception (
00282 "TAO_Basic_StreamCtrl::set_flow_connection");
00283 return;
00284 }
00285
00286 this->flows_.length (this->flow_count_ + 1);
00287 this->flows_ [this->flow_count_++] = CORBA::string_dup (flow_name);
00288 ACE_CString flow_name_key (flow_name);
00289 if (this->flow_connection_map_.bind (flow_name_key, flow_connection) != 0)
00290 {
00291 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) Cannot find flow: %s\n", flow_name ));
00292 throw AVStreams::noSuchFlow ();
00293 }
00294 }
00295
00296 TAO_Basic_StreamCtrl::~TAO_Basic_StreamCtrl (void)
00297 {
00298 }
00299
00300
00301
00302
00303
00304 CORBA::Boolean
00305 TAO_Negotiator::negotiate (AVStreams::Negotiator_ptr ,
00306 const AVStreams::streamQoS &)
00307 {
00308 ACE_DEBUG ((LM_DEBUG,
00309 "TAO_Negotiator::negotiate\n"));
00310 return 0;
00311 }
00312
00313
00314
00315
00316
00317 const int MMDevice_Map_Hash_Key::hash_maximum_ = 10000;
00318
00319
00320 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (void)
00321 {
00322 this->mmdevice_ = AVStreams::MMDevice::_nil ();
00323 }
00324
00325
00326 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (AVStreams::MMDevice_ptr mmdevice)
00327 {
00328 this->mmdevice_ = AVStreams::MMDevice::_duplicate (mmdevice);
00329 }
00330
00331
00332 MMDevice_Map_Hash_Key::MMDevice_Map_Hash_Key (const MMDevice_Map_Hash_Key& hash_key)
00333 {
00334 this->mmdevice_ = AVStreams::MMDevice::_duplicate (hash_key.mmdevice_);
00335 }
00336
00337
00338 MMDevice_Map_Hash_Key::~MMDevice_Map_Hash_Key (void)
00339 {
00340 CORBA::release (this->mmdevice_);
00341 }
00342
00343 bool
00344 MMDevice_Map_Hash_Key::operator == (const MMDevice_Map_Hash_Key &hash_key) const
00345 {
00346 CORBA::Boolean result = 0;
00347 try
00348 {
00349 result =
00350 this->mmdevice_->_is_equivalent (hash_key.mmdevice_);
00351 }
00352 catch (const CORBA::Exception& ex)
00353 {
00354 ex._tao_print_exception (
00355 "MMDevice_Map_Hash_Key::operator == ");
00356 return false;
00357 }
00358
00359 return result;
00360 }
00361
00362 bool
00363 operator < (const MMDevice_Map_Hash_Key &left,
00364 const MMDevice_Map_Hash_Key &right)
00365 {
00366 bool result = false;
00367
00368 try
00369 {
00370 const CORBA::ULong left_hash =
00371 left.mmdevice_->_hash (left.hash_maximum_);
00372
00373 const CORBA::ULong right_hash =
00374 right.mmdevice_->_hash (right.hash_maximum_);
00375
00376 result = left_hash < right_hash;
00377 }
00378 catch (const CORBA::Exception& ex)
00379 {
00380 ex._tao_print_exception ("operator < for MMDevice_Map_Hash_Key");
00381 return false;
00382 }
00383
00384 return result;
00385 }
00386
00387 u_long
00388 MMDevice_Map_Hash_Key::hash (void) const
00389 {
00390 u_long result = 0;
00391 try
00392 {
00393 result = this->mmdevice_->_hash (this->hash_maximum_);
00394 }
00395 catch (const CORBA::Exception& ex)
00396 {
00397 ex._tao_print_exception ("MMDevice_Map_Hash_Key::hash");
00398 return 0;
00399 }
00400 return result;
00401 }
00402
00403
00404
00405
00406
00407 TAO_StreamCtrl::TAO_StreamCtrl (void)
00408 :mcastconfigif_ (0)
00409 {
00410 try
00411 {
00412 this->streamctrl_ = this->_this ();
00413 char buf [BUFSIZ];
00414 int result = ACE_OS::hostname (buf, BUFSIZ);
00415 unsigned long ipaddr = 0;
00416 if (result == 0)
00417 ipaddr = ACE_OS::inet_addr (buf);
00418 this->source_id_ = TAO_AV_RTCP::alloc_srcid (ipaddr);
00419 }
00420 catch (const CORBA::Exception& ex)
00421 {
00422 ex._tao_print_exception ("TAO_StreamCtrl::TAO_StreamCtrl");
00423 }
00424 }
00425
00426 TAO_StreamCtrl::~TAO_StreamCtrl (void)
00427 {
00428 delete this->mcastconfigif_;
00429 }
00430
00431
00432
00433
00434 void
00435 TAO_StreamCtrl::stop (const AVStreams::flowSpec &flow_spec)
00436 {
00437 try
00438 {
00439 TAO_Basic_StreamCtrl::stop (flow_spec);
00440 if (this->flow_connection_map_.current_size () > 0)
00441 return;
00442 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00443 MMDevice_Map::ENTRY *entry = 0;
00444 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00445 {
00446 entry->int_id_.sep_->stop (flow_spec);
00447 }
00448 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00449 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00450 {
00451 entry->int_id_.sep_->stop (flow_spec);
00452 }
00453 }
00454 catch(AVStreams::noSuchFlow& ex)
00455 {
00456 throw;
00457 }
00458 catch (const CORBA::Exception& ex)
00459 {
00460 ex._tao_print_exception ("TAO_StreamCtrl::stop");
00461 throw;
00462 }
00463 catch(...)
00464 {
00465 printf ("TAO_StreamCtrl::stop - unknow exception\n");
00466 }
00467 }
00468
00469
00470
00471 void
00472 TAO_StreamCtrl::start (const AVStreams::flowSpec &flow_spec)
00473 {
00474 try
00475 {
00476 TAO_Basic_StreamCtrl::start (flow_spec);
00477 if (this->flow_connection_map_.current_size () > 0)
00478 return;
00479
00480 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00481 MMDevice_Map::ENTRY *entry = 0;
00482 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00483 {
00484 entry->int_id_.sep_->start (flow_spec);
00485 }
00486 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00487 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00488 {
00489 entry->int_id_.sep_->start (flow_spec);
00490 }
00491 }
00492 catch(AVStreams::noSuchFlow& ex)
00493 {
00494 throw;
00495 }
00496 catch (const CORBA::Exception& ex)
00497 {
00498 ex._tao_print_exception ("TAO_StreamCtrl::start");
00499 throw;
00500 }
00501 catch(...)
00502 {
00503 printf ("TAO_StreamCtrl::start - unknow exception\n");
00504 }
00505 }
00506
00507
00508
00509
00510 void
00511 TAO_StreamCtrl::destroy (const AVStreams::flowSpec &flow_spec)
00512 {
00513 try
00514 {
00515 TAO_Basic_StreamCtrl::destroy (flow_spec);
00516 if (this->flow_connection_map_.current_size () > 0)
00517 return;
00518
00519 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
00520 MMDevice_Map::ENTRY *entry = 0;
00521 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
00522 {
00523 entry->int_id_.sep_->destroy (flow_spec);
00524 }
00525 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
00526 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
00527 {
00528 entry->int_id_.sep_->destroy (flow_spec);
00529 }
00530 }
00531 catch (const CORBA::Exception& ex)
00532 {
00533 ex._tao_print_exception ("TAO_StreamCtrl::destroy");
00534 return;
00535 }
00536
00537 int result = TAO_AV_Core::deactivate_servant (this);
00538 if (result < 0)
00539 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::destroy failed\n"));
00540 }
00541
00542
00543
00544
00545
00546 CORBA::Boolean
00547 TAO_StreamCtrl::bind_devs (AVStreams::MMDevice_ptr a_party,
00548 AVStreams::MMDevice_ptr b_party,
00549 AVStreams::streamQoS &the_qos,
00550 const AVStreams::flowSpec &the_flows)
00551 {
00552 try
00553 {
00554 if (CORBA::is_nil (a_party) && CORBA::is_nil (b_party))
00555 ACE_ERROR_RETURN ((LM_ERROR, "Both parties are nil\n"), 0);
00556
00557 if (TAO_debug_level > 0)
00558 if (CORBA::is_nil (a_party) ||
00559 CORBA::is_nil (b_party))
00560 if (TAO_debug_level > 0)
00561 ACE_DEBUG ((LM_DEBUG,
00562 "(%P|%t) TAO_StreamCtrl::bind_devs: "
00563 "a_party or b_party is null"
00564 "Multicast mode\n"));
00565
00566
00567 CORBA::Boolean met_qos;
00568 CORBA::String_var named_vdev;
00569
00570 if (!CORBA::is_nil (a_party))
00571 {
00572 MMDevice_Map_Hash_Key find_key (a_party);
00573 MMDevice_Map_Entry find_entry;
00574 int result =
00575 this->mmdevice_a_map_.find (find_key, find_entry);
00576 if (result == 0)
00577 {
00578 if (TAO_debug_level > 0)
00579 {
00580
00581 if (TAO_debug_level > 0)
00582 ACE_DEBUG ((LM_DEBUG, "mmdevice a_party is already bound\n"));
00583 }
00584 return 1;
00585 }
00586 else
00587 {
00588 this->sep_a_ =
00589 a_party-> create_A (this->streamctrl_.in (),
00590 this->vdev_a_.out (),
00591 the_qos,
00592 met_qos,
00593 named_vdev.inout (),
00594 the_flows);
00595
00596 if (TAO_debug_level > 0)
00597 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_A: succeeded\n"));
00598
00599 CORBA::Any streamctrl_any;
00600 streamctrl_any <<= this->streamctrl_.in ();
00601 this->sep_a_->define_property ("Related_StreamCtrl",
00602 streamctrl_any);
00603
00604 CORBA::Any vdev_a_any;
00605 vdev_a_any <<= this->vdev_a_.in ();
00606 this->sep_a_->define_property ("Related_VDev",
00607 vdev_a_any);
00608
00609 CORBA::Any streamendpoint_a_any;
00610 streamendpoint_a_any <<= this->sep_a_.in ();
00611 this->vdev_a_->define_property ("Related_StreamEndpoint",
00612 streamendpoint_a_any);
00613
00614
00615 CORBA::Any mmdevice_a_any;
00616 mmdevice_a_any <<= a_party;
00617 this->vdev_a_->define_property ("Related_MMDevice",
00618 mmdevice_a_any);
00619
00620
00621 MMDevice_Map_Entry map_entry;
00622 MMDevice_Map_Hash_Key key (a_party);
00623 map_entry.sep_ = AVStreams::StreamEndPoint_A::_duplicate (this->sep_a_.in ());
00624 map_entry.vdev_ = AVStreams::VDev::_duplicate (this->vdev_a_.in ());
00625 map_entry.flowspec_ = the_flows;
00626 map_entry.qos_ = the_qos;
00627 result =
00628 this->mmdevice_a_map_.bind (key, map_entry);
00629 if (result < 0)
00630 if (TAO_debug_level > 0)
00631 ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the a_map"));
00632 }
00633 }
00634
00635 if (!CORBA::is_nil (b_party))
00636 {
00637 MMDevice_Map_Hash_Key find_key (b_party);
00638 MMDevice_Map_Entry find_entry;
00639 int result =
00640 this->mmdevice_b_map_.find (find_key, find_entry);
00641 if (result == 0)
00642 {
00643
00644 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "mmdevice b_party is already bound\n"));
00645 return 1;
00646 }
00647 else
00648 {
00649 this->sep_b_ =
00650 b_party-> create_B (this->streamctrl_.in (),
00651 this->vdev_b_.out (),
00652 the_qos,
00653 met_qos,
00654 named_vdev.inout (),
00655 the_flows);
00656
00657 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamCtrl::create_B: succeeded\n"));
00658
00659 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
00660 "\n(%P|%t)stream_endpoint_b_ = %s",
00661 TAO_ORB_Core_instance ()->orb ()->object_to_string (this->sep_b_.in ())));
00662
00663 CORBA::Any streamctrl_any;
00664 streamctrl_any <<= this->streamctrl_.in ();
00665 this->sep_b_->define_property ("Related_StreamCtrl",
00666 streamctrl_any);
00667
00668 CORBA::Any vdev_b_any;
00669 vdev_b_any <<= this->vdev_b_.in ();
00670 this->sep_b_->define_property ("Related_VDev",
00671 vdev_b_any);
00672
00673 CORBA::Any streamendpoint_b_any;
00674 streamendpoint_b_any <<= this->sep_b_.in ();
00675 this->vdev_b_->define_property ("Related_StreamEndpoint",
00676 streamendpoint_b_any);
00677
00678
00679 CORBA::Any mmdevice_b_any;
00680 mmdevice_b_any <<= b_party;
00681 this->vdev_b_->define_property ("Related_MMDevice",
00682 mmdevice_b_any);
00683
00684 MMDevice_Map_Entry map_entry;
00685 MMDevice_Map_Hash_Key key (b_party);
00686 map_entry.sep_ = AVStreams::StreamEndPoint::_duplicate (this->sep_b_.in ());
00687 map_entry.vdev_ = AVStreams::VDev::_duplicate(this->vdev_b_.in ());
00688 map_entry.flowspec_ = the_flows;
00689 map_entry.qos_ = the_qos;
00690 int result =
00691 this->mmdevice_b_map_.bind (key, map_entry);
00692 if (result < 0)
00693 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Error binding mmdevice entry in the b_map"));
00694 }
00695 }
00696
00697
00698 if ((!CORBA::is_nil (a_party)) && (!CORBA::is_nil (b_party)))
00699 {
00700 CORBA::Any sep_a_peer_any;
00701 CORBA::Any sep_b_peer_any;
00702
00703 sep_a_peer_any <<= this->sep_b_.in();
00704 sep_b_peer_any <<= this->sep_a_.in();
00705 this->sep_a_->define_property ("PeerAdapter",
00706 sep_a_peer_any);
00707
00708 this->sep_b_->define_property ("PeerAdapter",
00709 sep_b_peer_any);
00710 }
00711
00712
00713 if (CORBA::is_nil (b_party) && (!CORBA::is_nil (this->vdev_a_.in ())))
00714 {
00715
00716
00717
00718 try
00719 {
00720 CORBA::Any_ptr flows_any = this->sep_a_->get_property_value ("Flows");
00721 AVStreams::flowSpec_var flows;
00722 *flows_any >>= flows.out ();
00723 for (CORBA::ULong i=0; i< flows->length ();++i)
00724 {
00725 CORBA::Object_var fep_obj =
00726 this->sep_a_->get_fep (flows [i]);
00727 try
00728 {
00729 AVStreams::FlowProducer_var producer =
00730 AVStreams::FlowProducer::_narrow (fep_obj.in ());
00731 producer->set_source_id (this->source_id_++);
00732 }
00733 catch (const CORBA::Exception& ex)
00734 {
00735 if (TAO_debug_level > 0)
00736 ACE_DEBUG ((LM_DEBUG, " %s ", static_cast<char const*>(flows[i])));
00737
00738 ex._tao_print_exception (
00739 "producer_check: not a producer");
00740
00741 }
00742 }
00743 }
00744 catch (const CORBA::Exception&)
00745 {
00746
00747
00748
00749
00750
00751
00752 this->sep_a_->set_source_id (this->source_id_++);
00753 }
00754 if (!this->mcastconfigif_)
00755 {
00756 ACE_NEW_RETURN (this->mcastconfigif_,
00757 TAO_MCastConfigIf,
00758 0);
00759
00760 this->mcastconfigif_ptr_ = this->mcastconfigif_->_this ();
00761 }
00762
00763 CORBA::Boolean result = this->vdev_a_->set_Mcast_peer (this->streamctrl_.in (),
00764 this->mcastconfigif_ptr_.in (),
00765 the_qos,
00766 the_flows);
00767 if (!result)
00768 ACE_ERROR_RETURN ((LM_ERROR, "set_Mcast_peer failed\n"), 0);
00769 }
00770
00771 if (CORBA::is_nil (a_party))
00772 {
00773 if (!CORBA::is_nil (this->vdev_b_.in ()))
00774 {
00775
00776 if (!this->mcastconfigif_)
00777 ACE_ERROR_RETURN ((LM_ERROR, "first add a source and then a sink\n"), 0);
00778 this->mcastconfigif_->set_peer (this->vdev_b_.in (),
00779 the_qos,
00780 the_flows);
00781 }
00782
00783 int connect_leaf_success = 0;
00784 try
00785 {
00786
00787
00788 connect_leaf_success = this->sep_a_->connect_leaf (this->sep_b_.in (),
00789 the_qos,
00790 the_flows);
00791 connect_leaf_success = 1;
00792 }
00793 catch (const AVStreams::notSupported&)
00794 {
00795 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "connect_leaf failed\n"));
00796 connect_leaf_success = 0;
00797 }
00798 catch (const CORBA::Exception& ex)
00799 {
00800 ex._tao_print_exception (
00801 "TAO_StreamCtrl::bind_devs");
00802 }
00803 if (!connect_leaf_success)
00804 {
00805 if (TAO_debug_level > 0)
00806 ACE_DEBUG ((LM_DEBUG,"TAO_StreamCtrl::bind_devs Multiconnect\n"));
00807 AVStreams::flowSpec connect_flows = the_flows;
00808 this->sep_a_->multiconnect (the_qos, connect_flows);
00809 this->sep_b_->multiconnect (the_qos, connect_flows);
00810 }
00811 }
00812
00813 if (!CORBA::is_nil (a_party) && !CORBA::is_nil (b_party))
00814 {
00815
00816
00817
00818
00819 if( a_party->is_property_defined("Flows") &&
00820 b_party->is_property_defined("Flows") )
00821 {
00822 if (TAO_debug_level > 0) {
00823
00824 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Full profile, invoking bind()\n"));
00825
00826 }
00827
00828
00829
00830
00831 this->bind (this->sep_a_.in (),
00832 this->sep_b_.in (),
00833 the_qos,
00834 the_flows);
00835
00836
00837
00838 }
00839
00840 else if (!CORBA::is_nil (this->vdev_a_.in ()) && !CORBA::is_nil (this->vdev_b_.in ()))
00841 {
00842 if (TAO_debug_level > 0) {
00843
00844 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Light profile, invoking connect()\n"));
00845
00846 }
00847
00848
00849 this->vdev_a_->set_peer (this->streamctrl_.in (),
00850 this->vdev_b_.in (),
00851 the_qos,
00852 the_flows);
00853
00854 this->vdev_b_->set_peer (this->streamctrl_.in (),
00855 this->vdev_a_.in (),
00856 the_qos,
00857 the_flows);
00858
00859
00860
00861
00862 CORBA::Boolean result =
00863 this->sep_a_->connect (this->sep_b_.in (),
00864 the_qos,
00865 the_flows);
00866 if (result == 0)
00867 ACE_ERROR_RETURN ((LM_ERROR, "sep_a->connect (sep_b) failed\n"), 0);
00868 }
00869 }
00870 }
00871 catch (const CORBA::Exception& ex)
00872 {
00873 ex._tao_print_exception ("TAO_StreamCtrl::bind_devs");
00874 return 0;
00875 }
00876 return 1;
00877 }
00878
00879
00880
00881 CORBA::Boolean
00882 TAO_StreamCtrl::bind (AVStreams::StreamEndPoint_A_ptr sep_a,
00883 AVStreams::StreamEndPoint_B_ptr sep_b,
00884 AVStreams::streamQoS &stream_qos,
00885 const AVStreams::flowSpec &flow_spec)
00886 {
00887 this->sep_a_ = AVStreams::StreamEndPoint_A::_duplicate(sep_a);
00888 this->sep_b_ = AVStreams::StreamEndPoint_B::_duplicate(sep_b);
00889
00890 int result = 0;
00891 try
00892 {
00893 if (CORBA::is_nil (sep_a_.in() ) ||
00894 CORBA::is_nil (sep_b_.in() ))
00895 ACE_ERROR_RETURN ((LM_ERROR,
00896 "(%P|%t) TAO_StreamCtrl::bind:"
00897 "a_party or b_party null!"),
00898 0);
00899
00900
00901 CORBA::Any sep_any;
00902 sep_any <<= sep_b;
00903 sep_a_->define_property ("PeerAdapter",
00904 sep_any);
00905 sep_any <<= sep_a;
00906 sep_b_->define_property ("PeerAdapter",
00907 sep_any);
00908
00909
00910
00911 AVStreams::flowSpec a_flows, b_flows;
00912 CORBA::Any_var flows_any;
00913 flows_any = sep_a_->get_property_value ("Flows");
00914 AVStreams::flowSpec *temp_flows;
00915 flows_any.in () >>= temp_flows;
00916 a_flows = *temp_flows;
00917 flows_any = sep_b_->get_property_value ("Flows");
00918 flows_any.in () >>= temp_flows;
00919 b_flows = *temp_flows;
00920 u_int i;
00921 FlowEndPoint_Map *a_fep_map;
00922 FlowEndPoint_Map *b_fep_map;
00923 ACE_NEW_RETURN (a_fep_map,
00924 FlowEndPoint_Map,
00925 0);
00926 ACE_NEW_RETURN (b_fep_map,
00927 FlowEndPoint_Map,
00928 0);
00929 for (i=0;i<a_flows.length ();i++)
00930 {
00931 const char *flowname = a_flows[i];
00932
00933 CORBA::Object_var fep_obj =
00934 sep_a_->get_fep (flowname);
00935 AVStreams::FlowEndPoint_var fep =
00936 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00937 ACE_CString fep_key (flowname);
00938 result = a_fep_map->bind (fep_key, fep);
00939 if (result == -1)
00940 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00941 }
00942
00943 for (i=0;i<b_flows.length ();i++)
00944 {
00945 const char *flowname = b_flows[i];
00946
00947 CORBA::Object_var fep_obj =
00948 sep_b->get_fep (flowname);
00949 AVStreams::FlowEndPoint_var fep =
00950 AVStreams::FlowEndPoint::_narrow (fep_obj.in ());
00951 ACE_CString fep_key (flowname);
00952 result = b_fep_map->bind (fep_key, fep);
00953 if (result == -1)
00954 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamCtrl::bind failed for %s\n", flowname));
00955 }
00956 FlowEndPoint_Map *map_a = 0, *map_b = 0;
00957 if (flow_spec.length () == 0)
00958 {
00959 map_a = a_fep_map;
00960 map_b = b_fep_map;
00961 }
00962 else
00963 {
00964 FlowEndPoint_Map *spec_fep_map_a, *spec_fep_map_b;
00965 ACE_NEW_RETURN (spec_fep_map_a,
00966 FlowEndPoint_Map,
00967 0);
00968 ACE_NEW_RETURN (spec_fep_map_b,
00969 FlowEndPoint_Map,
00970 0);
00971 for (i=0; i< flow_spec.length ();i++)
00972 {
00973 TAO_Forward_FlowSpec_Entry *entry = 0;
00974 ACE_NEW_RETURN (entry,
00975 TAO_Forward_FlowSpec_Entry,
00976 0);
00977 entry->parse (flow_spec[i]);
00978 ACE_CString fep_key (entry->flowname ());
00979 AVStreams::FlowEndPoint_var fep;
00980 result = a_fep_map->find (fep_key, fep);
00981 if (result == -1)
00982 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on A side for flowname: %s\n", flow_spec[i].in ()), 0);
00983
00984 result = spec_fep_map_a->bind (fep_key, fep);
00985 if (result == -1)
00986 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00987
00988 result = b_fep_map->find (fep_key, fep);
00989 if (result == -1)
00990 ACE_ERROR_RETURN ((LM_ERROR, "Fep not found on B side for flowname: %s\n", flow_spec[i].in ()), 0);
00991
00992 result = spec_fep_map_b->bind (fep_key, fep);
00993 if (result == -1)
00994 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Bind failed for %s\n", flow_spec[i].in ()));
00995 }
00996 map_a = spec_fep_map_a;
00997 map_b = spec_fep_map_b;
00998 }
00999
01000 TAO_AV_QoS qos (stream_qos);
01001
01002
01003 FlowEndPoint_Map_Iterator a_feps_iterator (*map_a);
01004 FlowEndPoint_Map_Entry *a_feps_entry, *b_feps_entry;
01005 try
01006 {
01007
01008 for (;a_feps_iterator.next (a_feps_entry) != 0;
01009 a_feps_iterator.advance ())
01010 {
01011 AVStreams::FlowEndPoint_var fep_a = a_feps_entry->int_id_;
01012 AVStreams::FlowEndPoint_var connected_to =
01013 fep_a->get_connected_fep ();
01014
01015 if (!CORBA::is_nil (connected_to.in ()))
01016 {
01017
01018 continue;
01019 }
01020
01021 FlowEndPoint_Map_Iterator b_feps_iterator (*map_b);
01022 for (;b_feps_iterator.next (b_feps_entry) != 0;
01023 b_feps_iterator.advance ())
01024 {
01025 AVStreams::FlowEndPoint_var fep_b = b_feps_entry->int_id_;
01026 AVStreams::FlowConnection_var flow_connection;
01027
01028 AVStreams::FlowEndPoint_var connected_to =
01029 fep_b->get_connected_fep ();
01030
01031 if (!CORBA::is_nil (connected_to.in ()))
01032 {
01033
01034 continue;
01035 }
01036
01037 if (fep_a->is_fep_compatible (fep_b.in()) == 1)
01038 {
01039
01040
01041 CORBA::Object_var flow_connection_obj;
01042 CORBA::Any_var flowname_any =
01043 fep_a->get_property_value ("FlowName");
01044 const char *flowname = 0;
01045 flowname_any.in () >>= flowname;
01046 try
01047 {
01048 flow_connection_obj =
01049 this->get_flow_connection (flowname);
01050 flow_connection =
01051 AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
01052 }
01053 catch (const CORBA::Exception&)
01054 {
01055 TAO_FlowConnection *flowConnection;
01056 ACE_NEW_RETURN (flowConnection,
01057 TAO_FlowConnection,
01058 0);
01059 flow_connection = flowConnection->_this ();
01060 this->set_flow_connection (flowname,
01061 flow_connection.in ());
01062 }
01063
01064
01065
01066
01067
01068
01069
01070 AVStreams::FlowProducer_var producer;
01071 AVStreams::FlowConsumer_var consumer;
01072
01073 try
01074 {
01075 producer =
01076 AVStreams::FlowProducer::_narrow (fep_a.in());
01077 consumer =
01078 AVStreams::FlowConsumer::_narrow (fep_b.in());
01079
01080
01081
01082 if (CORBA::is_nil (producer.in ()))
01083 {
01084 producer =
01085 AVStreams::FlowProducer::_narrow (fep_b.in());
01086 consumer =
01087 AVStreams::FlowConsumer::_narrow (fep_a.in());
01088 }
01089
01090
01091
01092
01093 ACE_ASSERT (!CORBA::is_nil (producer.in ()));
01094 ACE_ASSERT (!CORBA::is_nil (consumer.in ()));
01095 }
01096 catch (const CORBA::Exception&)
01097 {
01098
01099 throw;
01100 }
01101 CORBA::String_var fep_a_name, fep_b_name;
01102 flowname_any = fep_a->get_property_value ("FlowName");
01103 const char *temp_name;
01104 flowname_any.in () >>= temp_name;
01105 fep_a_name = CORBA::string_dup (temp_name);
01106 flowname_any = fep_b->get_property_value ("FlowName");
01107 flowname_any.in () >>= temp_name;
01108 fep_b_name = CORBA::string_dup (temp_name);
01109 AVStreams::QoS flow_qos;
01110 flow_qos.QoSType = fep_a_name;
01111 flow_qos.QoSParams.length (0);
01112 result = qos.get_flow_qos (fep_a_name.in (), flow_qos);
01113 if (result == -1)
01114 {
01115 flow_qos.QoSType = fep_b_name;
01116 result = qos.get_flow_qos (fep_b_name.in (),
01117 flow_qos);
01118 if (result == -1 && TAO_debug_level > 0)
01119 ACE_DEBUG ((LM_DEBUG,
01120 "No QoS Specified for this flow <%s>\n", flowname));
01121 }
01122 flow_connection->connect (producer.in (),
01123 consumer.in (),
01124 flow_qos);
01125 }
01126 }
01127 }
01128 }
01129 catch (const CORBA::Exception& ex)
01130 {
01131 ex._tao_print_exception ("TAO_StreamCtrl::bind:flow_connect block");
01132 return 0;
01133 }
01134 }
01135 catch (const CORBA::Exception&)
01136 {
01137
01138
01139 this->sep_a_->connect (this->sep_b_.in (),
01140 stream_qos,
01141 flow_spec);
01142 }
01143 return 1;
01144 }
01145
01146 void
01147 TAO_StreamCtrl::unbind (void)
01148 {
01149 try
01150 {
01151 if (this->flow_connection_map_.current_size () > 0)
01152 return;
01153
01154 AVStreams::flowSpec flow_spec;
01155 flow_spec.length(0);
01156
01157 MMDevice_Map_Iterator a_iterator (this->mmdevice_a_map_);
01158 MMDevice_Map::ENTRY *entry = 0;
01159 for (;a_iterator.next (entry)!= 0;a_iterator.advance ())
01160 {
01161 entry->int_id_.sep_->destroy (flow_spec);
01162 }
01163 MMDevice_Map_Iterator b_iterator (this->mmdevice_b_map_);
01164 for (;b_iterator.next (entry)!= 0;b_iterator.advance ())
01165 {
01166 entry->int_id_.sep_->destroy (flow_spec);
01167 }
01168 }
01169 catch (const CORBA::Exception& ex)
01170 {
01171 ex._tao_print_exception ("TAO_StreamCtrl::unbind");
01172 return;
01173 }
01174 }
01175
01176 void
01177 TAO_StreamCtrl::unbind_party (AVStreams::StreamEndPoint_ptr ,
01178 const AVStreams::flowSpec &)
01179 {
01180 }
01181
01182 void
01183 TAO_StreamCtrl::unbind_dev (AVStreams::MMDevice_ptr ,
01184 const AVStreams::flowSpec & )
01185 {
01186 }
01187
01188 AVStreams::VDev_ptr
01189 TAO_StreamCtrl::get_related_vdev (AVStreams::MMDevice_ptr adev,
01190 AVStreams::StreamEndPoint_out sep)
01191 {
01192 MMDevice_Map_Hash_Key key (adev);
01193 MMDevice_Map_Entry entry;
01194 int result = -1;
01195 result = this->mmdevice_a_map_.find (key, entry);
01196 if (result < 0)
01197 {
01198 result = this->mmdevice_a_map_.find (key, entry);
01199 if (result < 0)
01200 return AVStreams::VDev::_nil ();
01201 }
01202 sep = AVStreams::StreamEndPoint::_duplicate (entry.sep_.in ());
01203 return AVStreams::VDev::_duplicate (entry.vdev_.in ());
01204 }
01205
01206 CORBA::Boolean
01207
01208 TAO_StreamCtrl::modify_QoS (AVStreams::streamQoS &new_qos,
01209 const AVStreams::flowSpec &the_spec)
01210 {
01211 if (TAO_debug_level > 0)
01212 ACE_DEBUG ((LM_DEBUG,
01213 "TAO_StreamCtrl::modify_QoS\n"));
01214
01215
01216 if (this->mcastconfigif_ != 0)
01217 {
01218
01219 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Cannot Modify the Qos for multipoint streams\n"));
01220 }
01221 else
01222 {
01223 try
01224 {
01225 AVStreams::flowSpec in_flowspec;
01226 AVStreams::flowSpec out_flowspec;
01227
01228 in_flowspec.length (0);
01229 out_flowspec.length (0);
01230
01231 int in_index = 0;
01232 int out_index = 0;
01233
01234 AVStreams::flowSpec flowspec;
01235 if (the_spec.length () == 0)
01236 {
01237
01238 flowspec = this->flows_;
01239 MMDevice_Map_Iterator iterator (this->mmdevice_a_map_);
01240 MMDevice_Map::ENTRY *entry = 0;
01241 for (;iterator.next (entry) != 0;iterator.advance ())
01242 {
01243 flowspec = entry->int_id_.flowspec_;
01244 }
01245 }
01246 else
01247 {
01248 flowspec = the_spec;
01249 }
01250
01251 if (TAO_debug_level > 0)
01252 ACE_DEBUG ((LM_DEBUG,
01253 "TAO_StreamCtrl::modify_QoS\n"));
01254
01255
01256 for (u_int i=0;i < flowspec.length ();i++)
01257 {
01258 TAO_Forward_FlowSpec_Entry entry;
01259 entry.parse (flowspec [i]);
01260 int direction = entry.direction ();
01261 if (direction == 0)
01262 {
01263 in_flowspec.length (in_index + 1);
01264 in_flowspec [in_index++] = CORBA::string_dup (entry.entry_to_string ());
01265 }
01266 else
01267 {
01268 out_flowspec.length (out_index + 1);
01269 out_flowspec [out_index++] = CORBA::string_dup (entry.entry_to_string ());
01270 }
01271 }
01272
01273 if (in_flowspec.length () != 0)
01274 {
01275 this->vdev_a_->modify_QoS (new_qos, in_flowspec);
01276 }
01277
01278 if (out_flowspec.length () != 0)
01279 {
01280 this->vdev_b_->modify_QoS (new_qos, out_flowspec);
01281 }
01282 }
01283 catch (const CORBA::Exception& ex)
01284 {
01285 ex._tao_print_exception ("TAO_StreamCtrl::modify_QoS");
01286 return 0;
01287 }
01288
01289 }
01290 return 1;
01291 }
01292
01293
01294
01295
01296
01297 TAO_MCastConfigIf::TAO_MCastConfigIf (void)
01298 :peer_list_iterator_ (peer_list_)
01299 {
01300 }
01301
01302 TAO_MCastConfigIf::~TAO_MCastConfigIf (void)
01303 {
01304
01305 }
01306
01307
01308 CORBA::Boolean
01309 TAO_MCastConfigIf::set_peer (CORBA::Object_ptr peer,
01310 AVStreams::streamQoS & qos,
01311 const AVStreams::flowSpec & flow_spec)
01312 {
01313 try
01314 {
01315 Peer_Info *info;
01316 ACE_NEW_RETURN (info,
01317 Peer_Info,
01318 0);
01319 info->peer_ = AVStreams::VDev::_narrow (peer);
01320 info->qos_ = qos;
01321 info->flow_spec_ = flow_spec;
01322 this->peer_list_.insert_tail (info);
01323 }
01324 catch (const CORBA::Exception& ex)
01325 {
01326 ex._tao_print_exception ("TAO_MCastConfigIf::set_peer");
01327 return 0;
01328 }
01329 return 1;
01330 }
01331
01332
01333 void
01334 TAO_MCastConfigIf::configure (const CosPropertyService::Property & a_configuration)
01335 {
01336 Peer_Info *info;
01337 try
01338 {
01339 for (this->peer_list_iterator_.first ();
01340 (info = this->peer_list_iterator_.next ()) != 0;
01341 this->peer_list_iterator_.advance ())
01342 {
01343 info->peer_->configure (a_configuration);
01344 }
01345 }
01346 catch (const CORBA::Exception& ex)
01347 {
01348 ex._tao_print_exception (
01349 "TAO_MCastConfigIf::set_configure");
01350 return;
01351 }
01352 }
01353
01354
01355 void
01356 TAO_MCastConfigIf::set_initial_configuration (const CosPropertyService::Properties &initial)
01357 {
01358 this->initial_configuration_ = initial;
01359 }
01360
01361
01362 void
01363 TAO_MCastConfigIf::set_format (const char * flowName,
01364 const char * format_name)
01365 {
01366 Peer_Info *info;
01367 try
01368 {
01369 for (this->peer_list_iterator_.first ();
01370 (info = this->peer_list_iterator_.next ()) != 0;
01371 this->peer_list_iterator_.advance ())
01372 {
01373 if (this->in_flowSpec (info->flow_spec_, flowName))
01374 {
01375 info->peer_->set_format (flowName, format_name);
01376 }
01377 }
01378 }
01379 catch (const CORBA::Exception& ex)
01380 {
01381 ex._tao_print_exception ("TAO_MCastConfigIf::set_format");
01382 return;
01383 }
01384 }
01385
01386
01387 void
01388 TAO_MCastConfigIf::set_dev_params (const char * flowName,
01389 const CosPropertyService::Properties & new_params)
01390 {
01391 Peer_Info *info;
01392 try
01393 {
01394
01395 for (this->peer_list_iterator_.first ();
01396 (info = this->peer_list_iterator_.next ()) != 0;
01397 this->peer_list_iterator_.advance ())
01398 {
01399 if (this->in_flowSpec (info->flow_spec_, flowName))
01400 {
01401 info->peer_->set_dev_params (flowName, new_params);
01402 }
01403 }
01404 }
01405 catch (const CORBA::Exception& ex)
01406 {
01407 ex._tao_print_exception (
01408 "TAO_MCastConfigIf::set_dev_params");
01409 return;
01410 }
01411 }
01412
01413 int
01414 TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char *flow_name)
01415 {
01416 size_t len = ACE_OS::strlen (flow_name);
01417 for (CORBA::ULong i = 0; i < flow_spec.length (); i++)
01418 if (ACE_OS::strncmp (flow_spec[i], flow_name, len) == 0)
01419 {
01420 return 1;
01421 }
01422 return 0;
01423 }
01424
01425
01426
01427
01428
01429 TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
01430 : protocol_object_set_ (0)
01431 {
01432 }
01433
01434 TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
01435 {
01436 }
01437
01438 int
01439 TAO_Base_StreamEndPoint::handle_close (void)
01440 {
01441
01442
01443 return -1;
01444 }
01445
01446 int
01447 TAO_Base_StreamEndPoint::handle_open (void)
01448 {
01449 return 0;
01450 }
01451
01452 int
01453 TAO_Base_StreamEndPoint::handle_stop (const AVStreams::flowSpec &)
01454 {
01455 return 0;
01456 }
01457
01458 int
01459 TAO_Base_StreamEndPoint::handle_start (const AVStreams::flowSpec &)
01460 {
01461 return 0;
01462 }
01463
01464 int
01465 TAO_Base_StreamEndPoint::handle_destroy (const AVStreams::flowSpec &)
01466 {
01467 return 0;
01468 }
01469
01470
01471 CORBA::Boolean
01472 TAO_Base_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &)
01473 {
01474 return 1;
01475 }
01476
01477
01478 CORBA::Boolean
01479 TAO_Base_StreamEndPoint::handle_postconnect (AVStreams::flowSpec &)
01480 {
01481
01482 while (!this->is_protocol_object_set ())
01483 TAO_AV_CORE::instance ()->orb ()->perform_work ();
01484 return 1;
01485 }
01486
01487
01488 CORBA::Boolean
01489 TAO_Base_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &)
01490 {
01491 return 1;
01492 }
01493
01494 int
01495 TAO_Base_StreamEndPoint::set_protocol_object (const char * ,
01496 TAO_AV_Protocol_Object * )
01497 {
01498 return -1;
01499 }
01500
01501 void
01502 TAO_Base_StreamEndPoint::protocol_object_set (void)
01503 {
01504 this->protocol_object_set_ = 1;
01505 }
01506
01507
01508 int
01509 TAO_Base_StreamEndPoint::is_protocol_object_set (void)
01510 {
01511 return this->protocol_object_set_;
01512 }
01513
01514 int
01515 TAO_Base_StreamEndPoint::get_callback (const char * ,
01516 TAO_AV_Callback *&)
01517 {
01518 return -1;
01519 }
01520
01521 int
01522 TAO_Base_StreamEndPoint::get_control_callback (const char * ,
01523 TAO_AV_Callback *&)
01524 {
01525 return -1;
01526 }
01527
01528 void
01529 TAO_Base_StreamEndPoint::set_flow_handler (const char *flowname,
01530 TAO_AV_Flow_Handler *handler)
01531 {
01532 if(TAO_debug_level > 1)
01533 {
01534 ACE_DEBUG ((LM_DEBUG, "(%N,%l) TAO_Base_StreamEndPoint::set_flow_handler(), flowname: %s\n", flowname));
01535 }
01536 ACE_CString flow_name_key (flowname);
01537 if (this->flow_handler_map_.bind (flow_name_key, handler) != 0)
01538 ACE_ERROR ((LM_ERROR,
01539 "Error in storing flow handler\n"));
01540 }
01541
01542 void
01543 TAO_Base_StreamEndPoint::set_control_flow_handler (const char *flowname,
01544 TAO_AV_Flow_Handler *handler)
01545 {
01546 ACE_CString flow_name_key (flowname);
01547 if (this->control_flow_handler_map_.bind (flow_name_key, handler) != 0)
01548 ACE_ERROR ((LM_ERROR,
01549 "Error in storing control flow handler\n"));
01550 }
01551
01552
01553
01554
01555
01556
01557
01558 TAO_StreamEndPoint::TAO_StreamEndPoint (void)
01559 :flow_count_ (0),
01560 flow_num_ (0),
01561 mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01562 {
01563
01564 this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01565 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01566
01567 }
01568
01569
01570 CORBA::Boolean
01571 TAO_StreamEndPoint::connect (AVStreams::StreamEndPoint_ptr responder,
01572 AVStreams::streamQoS &qos,
01573 const AVStreams::flowSpec &the_spec)
01574 {
01575 if (TAO_debug_level > 0)
01576 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01577 CORBA::Boolean retv = 0;
01578 this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01579 try
01580 {
01581 if (!CORBA::is_nil (this->negotiator_.in ()))
01582 {
01583 ACE_DEBUG ((LM_DEBUG,
01584 "NEGOTIATOR AVIALABLE\n"));
01585
01586 CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01587
01588 AVStreams::Negotiator_ptr peer_negotiator;
01589 negotiator_any.in () >>= peer_negotiator;
01590 if (!CORBA::is_nil (peer_negotiator))
01591 {
01592 CORBA::Boolean result =
01593 this->negotiator_->negotiate (peer_negotiator,
01594 qos);
01595 if (!result)
01596 if (TAO_debug_level > 0)
01597 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01598 }
01599 }
01600 }
01601 catch (const CORBA::Exception& ex)
01602 {
01603 ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01604 }
01605
01606 try
01607 {
01608 if (this->protocols_.length () > 0)
01609 {
01610
01611 CORBA::Any_var protocols_any =
01612 responder->get_property_value ("AvailableProtocols");
01613 AVStreams::protocolSpec peer_protocols;
01614 AVStreams::protocolSpec *temp_protocols;
01615 protocols_any.in () >>= temp_protocols;
01616 peer_protocols = *temp_protocols;
01617 for (u_int i=0;i<peer_protocols.length ();i++)
01618 {
01619 for (u_int j=0;j<this->protocols_.length ();j++)
01620 if (ACE_OS::strcmp (peer_protocols [i],
01621 this->protocols_[j]) == 0)
01622 {
01623
01624 this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01625 break;
01626 }
01627 }
01628 }
01629 }
01630 catch (const CORBA::Exception&)
01631 {
01632 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01633 }
01634 try
01635 {
01636 AVStreams::streamQoS network_qos;
01637 if (qos.length () > 0)
01638 {
01639 if (TAO_debug_level > 0)
01640 ACE_DEBUG ((LM_DEBUG,
01641 "QoS is Specified\n"));
01642
01643 int result = this->translate_qos (qos,
01644 network_qos);
01645 if (result != 0)
01646 if (TAO_debug_level > 0)
01647 ACE_DEBUG ((LM_DEBUG,
01648 "QoS translation failed\n"));
01649
01650 this->qos ().set (network_qos);
01651 }
01652
01653
01654 AVStreams::flowSpec flow_spec (the_spec);
01655 this->handle_preconnect (flow_spec);
01656
01657 if (TAO_debug_level > 0)
01658 ACE_DEBUG ((LM_DEBUG,
01659 "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01660 flow_spec.length ()));
01661 u_int i;
01662 for (i=0;i<flow_spec.length ();i++)
01663 {
01664 TAO_Forward_FlowSpec_Entry *entry = 0;
01665 ACE_NEW_RETURN (entry,
01666 TAO_Forward_FlowSpec_Entry,
01667 0);
01668
01669 if (entry->parse (flow_spec[i]) == -1)
01670 return 0;
01671
01672 if (TAO_debug_level > 0)
01673 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n", entry->entry_to_string ()));
01674
01675 this->forward_flow_spec_set.insert (entry);
01676 }
01677
01678 int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01679 this->forward_flow_spec_set,
01680 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01681 flow_spec);
01682
01683
01684 if (result < 0)
01685 ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01686
01687
01688 AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01689
01690 retv = responder->request_connection (streamendpoint.in (),
01691 0,
01692 network_qos,
01693 flow_spec);
01694
01695 if (TAO_debug_level > 0)
01696 ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01697
01698 if (retv == 0)
01699 return retv;
01700 for (i=0;i<flow_spec.length ();i++)
01701 {
01702 TAO_Reverse_FlowSpec_Entry *entry = 0;
01703 ACE_NEW_RETURN (entry,
01704 TAO_Reverse_FlowSpec_Entry,
01705 0);
01706 if (entry->parse (flow_spec[i]) == -1)
01707 ACE_ERROR_RETURN ((LM_ERROR,
01708 "Reverse_Flow_Spec_Set::parse failed\n"),
01709 0);
01710
01711 if (TAO_debug_level > 0)
01712 ACE_DEBUG ((LM_DEBUG,
01713 "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01714 entry->entry_to_string ()));
01715
01716 this->reverse_flow_spec_set.insert (entry);
01717 }
01718
01719 result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01720 this->forward_flow_spec_set,
01721 this->reverse_flow_spec_set,
01722 TAO_AV_Core::TAO_AV_ENDPOINT_A);
01723 if (result < 0)
01724 ACE_ERROR_RETURN ((LM_ERROR,
01725 "TAO_AV_Core::init_reverse_flows failed\n"),
01726 0);
01727
01728
01729 retv = this->handle_postconnect (flow_spec);
01730 }
01731 catch (const CORBA::Exception& ex)
01732 {
01733 ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01734 return 0;
01735 }
01736 return retv;
01737 }
01738
01739 int
01740 TAO_StreamEndPoint::translate_qos (const AVStreams::streamQoS& application_qos,
01741 AVStreams::streamQoS& network_qos)
01742 {
01743 u_int len = application_qos.length ();
01744 network_qos.length (len);
01745 for (u_int i=0;i<len;i++)
01746 {
01747 network_qos [i].QoSType = application_qos [i].QoSType;
01748 network_qos [i].QoSParams = application_qos [i].QoSParams;
01749 }
01750 return 0;
01751 }
01752
01753
01754
01755
01756 void
01757 TAO_StreamEndPoint::stop (const AVStreams::flowSpec &flow_spec)
01758 {
01759
01760 this->handle_stop (flow_spec);
01761
01762 if (flow_spec.length () > 0)
01763 {
01764
01765 for (u_int i=0;i<flow_spec.length ();i++)
01766 {
01767 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01768 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01769 begin != end; ++begin)
01770 {
01771 TAO_Forward_FlowSpec_Entry entry;
01772 entry.parse (flow_spec[i]);
01773 if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01774 {
01775 TAO_FlowSpec_Entry *entry = *begin;
01776
01777 if (entry->handler() != 0)
01778 entry->handler ()->stop (entry->role ());
01779 if (entry->control_handler () != 0)
01780 entry->control_handler ()->stop (entry->role ());
01781 break;
01782 }
01783 }
01784 }
01785 }
01786 else
01787 {
01788 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01789 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01790 begin != end; ++begin)
01791 {
01792 TAO_FlowSpec_Entry *entry = *begin;
01793
01794 if (entry->handler() != 0)
01795 entry->handler ()->stop (entry->role ());
01796 if (entry->control_handler () != 0)
01797 entry->control_handler ()->stop (entry->role ());
01798 }
01799 }
01800 }
01801
01802
01803
01804 void
01805 TAO_StreamEndPoint::start (const AVStreams::flowSpec &flow_spec)
01806 {
01807 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01808
01809 this->handle_start (flow_spec);
01810
01811 if (flow_spec.length () > 0)
01812 {
01813
01814 for (u_int i=0;i<flow_spec.length ();i++)
01815 {
01816 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01817 for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01818 forward_begin != end; ++forward_begin)
01819 {
01820 TAO_FlowSpec_Entry *entry = *forward_begin;
01821 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01822 {
01823
01824 if (entry->handler () != 0)
01825 {
01826 entry->handler ()->start (entry->role ());
01827 }
01828 if (entry->control_handler () != 0)
01829 {
01830 entry->control_handler ()->start (entry->role ());
01831 }
01832 }
01833 }
01834
01835 end = this->reverse_flow_spec_set.end ();
01836 for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01837 reverse_begin != end; ++reverse_begin)
01838 {
01839 TAO_FlowSpec_Entry *entry = *reverse_begin;
01840 if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01841 {
01842
01843 if (entry->handler () != 0)
01844 {
01845 entry->handler ()->start (entry->role ());
01846 }
01847 if (entry->control_handler () != 0)
01848 {
01849 entry->control_handler ()->start (entry->role ());
01850 }
01851 }
01852 }
01853 }
01854 }
01855 else
01856 {
01857 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01858 for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01859 forwardbegin != end; ++forwardbegin)
01860 {
01861 TAO_FlowSpec_Entry *entry = *forwardbegin;
01862 if (entry->handler () != 0)
01863 {
01864 entry->handler ()->start (entry->role ());
01865 }
01866 if (entry->control_handler () != 0)
01867 {
01868 entry->control_handler ()->start (entry->role ());
01869 }
01870 }
01871
01872 end = this->reverse_flow_spec_set.end ();
01873 for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01874 reversebegin != end; ++reversebegin)
01875 {
01876 TAO_FlowSpec_Entry *entry = *reversebegin;
01877
01878 if (entry->handler () != 0)
01879 {
01880 entry->handler ()->start (entry->role ());
01881 }
01882 if (entry->control_handler () != 0)
01883 {
01884 entry->control_handler ()->start (entry->role ());
01885 }
01886 }
01887 }
01888 }
01889
01890
01891 void
01892 TAO_StreamEndPoint::destroy (const AVStreams::flowSpec &flow_spec)
01893 {
01894 CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01895
01896 AVStreams::VDev_ptr vdev;
01897
01898 vdev_any.in() >>= vdev;
01899 CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01900
01901
01902
01903 CORBA::Object_var obj;
01904 mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01905
01906 AVStreams::MediaControl_var media_ctrl =
01907 AVStreams::MediaControl::_narrow( obj.in() );
01908
01909
01910
01911 if ( !CORBA::is_nil( vdev ) )
01912 {
01913 PortableServer::ServantBase_var vdev_servant =
01914 TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01915 TAO_AV_Core::deactivate_servant (vdev_servant.in());
01916 }
01917
01918 if ( !CORBA::is_nil ( media_ctrl.in () ) )
01919 {
01920 PortableServer::ServantBase_var mc_servant =
01921 TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01922 TAO_AV_Core::deactivate_servant (mc_servant.in());
01923 }
01924
01925 int result = TAO_AV_Core::deactivate_servant (this);
01926 if (result < 0)
01927 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01928
01929 if (flow_spec.length () > 0)
01930 {
01931 for (u_int i=0;i<flow_spec.length ();i++)
01932 {
01933 {
01934 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01935 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01936 begin != end; ++begin)
01937 {
01938 TAO_FlowSpec_Entry *entry = *begin;
01939 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01940 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01941 {
01942 if (entry->protocol_object ())
01943 {
01944 entry->protocol_object ()->destroy ();
01945 }
01946 break;
01947 }
01948 }
01949 }
01950 {
01951 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01952 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01953 begin != end; ++begin)
01954 {
01955 TAO_FlowSpec_Entry *entry = *begin;
01956 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01957 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01958 {
01959 if (entry->protocol_object ())
01960 {
01961 entry->protocol_object ()->destroy ();
01962 }
01963 break;
01964 }
01965 }
01966 }
01967 }
01968 }
01969 else
01970 {
01971 {
01972 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01973 for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01974 begin != end; ++begin)
01975 {
01976 TAO_FlowSpec_Entry *entry = *begin;
01977 if (entry->protocol_object ())
01978 {
01979 entry->protocol_object ()->stop ();
01980
01981 ACE_CString control_flowname =
01982 TAO_AV_Core::get_control_flowname (entry->flowname ());
01983 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01984 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01985
01986 entry->protocol_object ()->destroy ();
01987 }
01988 }
01989 }
01990 {
01991 TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01992 for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01993 begin != end; ++begin)
01994 {
01995 TAO_FlowSpec_Entry *entry = *begin;
01996 if (entry->protocol_object ())
01997 {
01998 entry->protocol_object ()->stop ();
01999
02000 ACE_CString control_flowname =
02001 TAO_AV_Core::get_control_flowname (entry->flowname ());
02002 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
02003 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
02004 entry->protocol_object ()->destroy ();
02005
02006 }
02007 }
02008 }
02009 }
02010
02011
02012
02013
02014 }
02015
02016
02017
02018 CORBA::Boolean
02019 TAO_StreamEndPoint::request_connection (AVStreams::StreamEndPoint_ptr ,
02020 CORBA::Boolean ,
02021 AVStreams::streamQoS &qos,
02022 AVStreams::flowSpec &flow_spec)
02023
02024 {
02025 if (TAO_debug_level > 0)
02026 ACE_DEBUG ((LM_DEBUG,
02027 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
02028
02029 int result = 0;
02030 try
02031 {
02032 AVStreams::streamQoS network_qos;
02033 if (qos.length () > 0)
02034 {
02035 if (TAO_debug_level > 0)
02036 ACE_DEBUG ((LM_DEBUG,
02037 "QoS is Specified\n"));
02038
02039 int result = this->translate_qos (qos, network_qos);
02040 if (result != 0)
02041 if (TAO_debug_level > 0)
02042 ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02043
02044 this->qos ().set (network_qos);
02045 }
02046
02047 if (TAO_debug_level > 0)
02048 ACE_DEBUG ((LM_DEBUG,
02049 "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02050 "flowspec has length = %d and the strings are:\n",
02051 flow_spec.length ()));
02052 CORBA::ULong i;
02053
02054 for (i=0;i<flow_spec.length ();i++)
02055 {
02056 TAO_Forward_FlowSpec_Entry *entry = 0;
02057 ACE_NEW_RETURN (entry,
02058 TAO_Forward_FlowSpec_Entry,
02059 0);
02060
02061 CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02062
02063 if(TAO_debug_level > 0)
02064 ACE_DEBUG(( LM_DEBUG,
02065 "%N:%l Parsing flow spec: [%s]\n",
02066 string_entry.in ()));
02067
02068 if (entry->parse (string_entry.in ()) == -1)
02069 {
02070 if (TAO_debug_level > 0)
02071 ACE_DEBUG ((LM_DEBUG,
02072 "%N:%l Error parsing flow_spec: [%s]\n",
02073 string_entry.in ()));
02074 return 0;
02075 }
02076 if (TAO_debug_level > 0)
02077 ACE_DEBUG ((LM_DEBUG,
02078 "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02079 entry->entry_to_string ()));
02080
02081 this->forward_flow_spec_set.insert (entry);
02082 }
02083
02084 result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02085 this->forward_flow_spec_set,
02086 TAO_AV_Core::TAO_AV_ENDPOINT_B,
02087 flow_spec);
02088
02089 if (result < 0)
02090 return 0;
02091
02092
02093 result = this->handle_connection_requested (flow_spec);
02094 }
02095 catch (const CORBA::Exception& ex)
02096 {
02097 ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02098 return 0;
02099 }
02100 return result;
02101 }
02102
02103 int
02104 TAO_StreamEndPoint::change_qos (AVStreams::streamQoS &new_qos,
02105 const AVStreams::flowSpec &the_flows)
02106 {
02107 if (TAO_debug_level > 0)
02108 ACE_DEBUG ((LM_DEBUG,
02109 "TAO_StreamEndPoint::change_qos\n"));
02110
02111 TAO_AV_QoS qos (new_qos);
02112 for (int i = 0; (unsigned) i < the_flows.length (); i++)
02113 {
02114 TAO_Forward_FlowSpec_Entry entry;
02115 entry.parse (the_flows [i]);
02116 ACE_CString flow_name_key (entry.flowname ());
02117 Flow_Handler_Map_Entry *handler_entry;
02118 if (this->flow_handler_map_.find (flow_name_key,
02119 handler_entry) == 0)
02120 {
02121 AVStreams::QoS flow_qos;
02122 if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02123 ACE_DEBUG ((LM_DEBUG,
02124 "New QoS for the flow %s is not specified\n",
02125 entry.flowname ()));
02126 int result;
02127 result = handler_entry->int_id_->change_qos (flow_qos);
02128 if (result != 0)
02129 ACE_ERROR_RETURN ((LM_ERROR,
02130 "Modifying QoS Failed\n"),
02131 -1);
02132
02133 }
02134 }
02135 return 0;
02136 }
02137
02138
02139 CORBA::Boolean
02140 TAO_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
02141 const AVStreams::flowSpec &the_flows)
02142 {
02143 if (TAO_debug_level > 0)
02144 ACE_DEBUG ((LM_DEBUG,
02145 "TAO_StreamEndPoint::modify_QoS\n"));
02146
02147 int result = this->change_qos (new_qos, the_flows);
02148
02149 if (result != 0)
02150 return 0;
02151
02152 return 1;
02153
02154 }
02155
02156
02157
02158 CORBA::Boolean
02159 TAO_StreamEndPoint::set_protocol_restriction (const AVStreams::protocolSpec &protocols)
02160 {
02161 try
02162 {
02163 CORBA::Any protocol_restriction_any;
02164
02165 protocol_restriction_any <<= protocols;
02166 this->define_property ("ProtocolRestriction",
02167 protocol_restriction_any);
02168 this->protocols_ = protocols;
02169 }
02170 catch (const CORBA::Exception& ex)
02171 {
02172 ex._tao_print_exception (
02173 "TAO_StreamEndPoint::set_protocol_restriction");
02174 return 0;
02175 }
02176 return 1;
02177 }
02178
02179
02180 void
02181 TAO_StreamEndPoint::disconnect (const AVStreams::flowSpec &the_spec)
02182 {
02183 ACE_UNUSED_ARG (the_spec);
02184 }
02185
02186
02187
02188 void
02189 TAO_StreamEndPoint::set_FPStatus (const AVStreams::flowSpec &,
02190 const char *fp_name,
02191 const CORBA::Any &fp_settings)
02192 {
02193 if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02194 return;
02195 fp_settings >>= this->sfp_status_;
02196
02197 }
02198
02199
02200 CORBA::Object_ptr
02201 TAO_StreamEndPoint::get_fep (const char *flow_name)
02202 {
02203 ACE_CString fep_name_key (flow_name);
02204 AVStreams::FlowEndPoint_var fep_entry;
02205 if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02206 return fep_entry._retn();
02207 return 0;
02208 }
02209
02210 char*
02211 TAO_StreamEndPoint::add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
02212 {
02213 ACE_CString flow_name;
02214
02215 try
02216 {
02217
02218
02219 flow_name = "flow";
02220 char tmp[255];
02221 ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02222 flow_name += tmp;
02223
02224 CORBA::Any flowname_any;
02225 flowname_any <<= flow_name.c_str ();
02226 fep->define_property ("Flow",
02227 flowname_any);
02228 }
02229 catch (const CORBA::Exception& ex)
02230 {
02231 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02232 return 0;
02233 }
02234 return ACE_OS::strdup( flow_name.c_str () );
02235 }
02236
02237 char*
02238 TAO_StreamEndPoint::add_fep_i (AVStreams::FlowEndPoint_ptr fep)
02239 {
02240 CORBA::String_var flow_name;
02241 try
02242 {
02243 CORBA::Any_var flow_name_any =
02244 fep->get_property_value ("FlowName");
02245
02246 const char *tmp;
02247 flow_name_any >>= tmp;
02248 flow_name = CORBA::string_dup (tmp);
02249 }
02250 catch (const CORBA::Exception&)
02251 {
02252 flow_name =
02253 this->add_fep_i_add_property (fep);
02254 }
02255 return flow_name._retn ();
02256 }
02257
02258 char *
02259 TAO_StreamEndPoint::add_fep (CORBA::Object_ptr fep_obj)
02260 {
02261 AVStreams::FlowEndPoint_var fep =
02262 AVStreams::FlowEndPoint::_narrow (fep_obj);
02263
02264 CORBA::String_var flow_name =
02265 this->add_fep_i (fep.in ());
02266
02267 try
02268 {
02269 fep->lock ();
02270
02271
02272 ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02273 if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02274 {
02275 throw AVStreams::streamOpFailed ();
02276 }
02277
02278 this->flow_count_++;
02279 this->flows_.length (this->flow_count_);
02280 this->flows_[this->flow_count_-1] = flow_name;
02281
02282 CORBA::Any flows_any;
02283 flows_any <<= this->flows_;
02284 this->define_property ("Flows",
02285 flows_any);
02286 }
02287 catch (const CORBA::Exception& ex)
02288 {
02289 ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02290 return 0;
02291 }
02292 return flow_name._retn ();
02293 }
02294
02295
02296 void
02297 TAO_StreamEndPoint::remove_fep (const char *flow_name)
02298 {
02299 try
02300 {
02301 ACE_CString fep_name_key (flow_name);
02302 AVStreams::FlowEndPoint_var fep_entry;
02303
02304 if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02305 throw AVStreams::streamOpFailed ();
02306
02307 AVStreams::flowSpec new_flows (this->flows_.length ());
02308 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02309 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02310 new_flows[j++] = this->flows_[i];
02311
02312 CORBA::Any flows;
02313 flows <<= new_flows;
02314 this->flows_ = new_flows;
02315 this->define_property ("Flows",
02316 flows);
02317 }
02318 catch (const CORBA::Exception& ex)
02319 {
02320 ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02321 }
02322 }
02323
02324
02325 void
02326 TAO_StreamEndPoint::set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
02327 {
02328 try
02329 {
02330 CORBA::Any negotiator;
02331 negotiator <<= new_negotiator;
02332 this->define_property ("Negotiator",
02333 negotiator);
02334 this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02335 }
02336 catch (const CORBA::Exception& ex)
02337 {
02338 ex._tao_print_exception (
02339 "TAO_StreamEndPoint::set_negotiator");
02340 }
02341 }
02342
02343
02344
02345 void
02346 TAO_StreamEndPoint::set_key (const char *flow_name,
02347 const AVStreams::key & the_key)
02348 {
02349 try
02350 {
02351 this->key_ = the_key;
02352 CORBA::Any PublicKey;
02353 PublicKey <<= the_key;
02354 char PublicKey_property [BUFSIZ];
02355 ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02356 this->define_property (PublicKey_property,
02357 PublicKey);
02358 }
02359 catch (const CORBA::Exception& ex)
02360 {
02361 ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02362 }
02363 }
02364
02365
02366 void
02367 TAO_StreamEndPoint::set_source_id (CORBA::Long source_id)
02368 {
02369 this->source_id_ = source_id;
02370 }
02371
02372 CORBA::Boolean
02373 TAO_StreamEndPoint::multiconnect (AVStreams::streamQoS &,
02374 AVStreams::flowSpec &)
02375 {
02376 if (TAO_debug_level > 0)
02377 ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02378 return 0;
02379 }
02380
02381 TAO_StreamEndPoint::~TAO_StreamEndPoint (void)
02382 {
02383
02384 TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02385 TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02386
02387 int i=0;
02388
02389
02390 for ( ; begin != end; ++begin, ++i)
02391 {
02392
02393
02394 TAO_FlowSpec_Entry *entry = *begin;
02395 delete entry;
02396
02397 }
02398 begin = this->reverse_flow_spec_set.begin ();
02399 end = this->reverse_flow_spec_set.end ();
02400 i = 0;
02401 for (; begin != end; ++begin)
02402 {
02403
02404
02405 TAO_FlowSpec_Entry *entry = *begin;
02406 delete entry;
02407
02408 }
02409 }
02410
02411
02412
02413
02414
02415
02416 TAO_StreamEndPoint_A::TAO_StreamEndPoint_A (void)
02417 {
02418 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_StreamEndPoint_A::TAO_StreamEndPoint_A: created\n"));
02419 }
02420
02421
02422 CORBA::Boolean
02423 TAO_StreamEndPoint_A::multiconnect (AVStreams::streamQoS &stream_qos,
02424 AVStreams::flowSpec &flow_spec)
02425 {
02426 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPointA::multiconnect\n"));
02427 try
02428 {
02429 int result = 0;
02430 TAO_AV_QoS qos (stream_qos);
02431 for (u_int i=0;i< flow_spec.length ();i++)
02432 {
02433 TAO_Forward_FlowSpec_Entry *forward_entry = 0;
02434 ACE_NEW_RETURN (forward_entry,
02435 TAO_Forward_FlowSpec_Entry,
02436 0);
02437 forward_entry->parse (flow_spec[i]);
02438 ACE_CString mcast_key (forward_entry->flowname ());
02439 AVStreams::FlowEndPoint_var flow_endpoint;
02440
02441
02442
02443
02444
02445
02446
02447
02448
02449 if (this->fep_map_.find (mcast_key, flow_endpoint) == 0)
02450 {
02451 try
02452 {
02453 AVStreams::QoS flow_qos;
02454 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02455 if (result < 0)
02456 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s\n", forward_entry->flowname ()));
02457
02458 AVStreams::FlowProducer_var producer;
02459 producer = AVStreams::FlowProducer::_narrow (flow_endpoint.in());
02460
02461
02462 if (!CORBA::is_nil (producer.in ()))
02463 {
02464 AVStreams::FlowConnection_var flow_connection;
02465 try
02466 {
02467 if (CORBA::is_nil (this->streamctrl_.in ()))
02468 {
02469 CORBA::Any_var streamctrl_any;
02470 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02471 AVStreams::StreamCtrl_ptr streamctrl;
02472 streamctrl_any.in () >>= streamctrl;
02473 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02474 }
02475
02476 CORBA::Object_var flow_connection_obj =
02477 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02478 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02479 }
02480 catch (const CORBA::Exception&)
02481 {
02482 TAO_FlowConnection *flowConnection;
02483 ACE_NEW_RETURN (flowConnection,
02484 TAO_FlowConnection,
02485 0);
02486
02487 flowConnection->set_mcast_addr (this->mcast_addr_, this->mcast_port_);
02488 this->mcast_port_++;
02489 flowConnection->set_protocol (forward_entry->carrier_protocol_str ());
02490 flow_connection = flowConnection->_this ();
02491 this->streamctrl_->set_flow_connection (forward_entry->flowname (),
02492 flow_connection.in ());
02493 }
02494 if (ACE_OS::strcmp (forward_entry->flow_protocol_str (), "") != 0)
02495 {
02496 CORBA::Any fp_settings;
02497 flow_connection->use_flow_protocol (forward_entry->flow_protocol_str (),
02498 fp_settings);
02499 }
02500 result = flow_connection->add_producer (producer.in (),
02501 flow_qos);
02502 if (result == 0)
02503 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_A::multiconnect: add_producer failed\n"), 0);
02504 }
02505 }
02506 catch (const CORBA::Exception& ex)
02507 {
02508
02509 ex._tao_print_exception (
02510 "FlowProducer::_narrow");
02511 ACE_ERROR_RETURN ((LM_ERROR, "sep_a doesn't contain a flowproducer"), 0);
02512 }
02513 }
02514 else
02515 {
02516 ACE_INET_Addr *mcast_addr;
02517 TAO_FlowSpec_Entry *entry = 0;
02518 result = this->mcast_entry_map_.find (mcast_key, entry);
02519 if (result == 0)
02520 {
02521 mcast_addr = dynamic_cast<ACE_INET_Addr *> (entry->address ());
02522 char str_addr [BUFSIZ];
02523 result = mcast_addr->addr_to_string (str_addr, BUFSIZ);
02524 if (result < 0)
02525 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPointA::multiconnect ::addr_to_string failed\n"), 0);
02526 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_A::multiconnect:%s\n", str_addr));
02527 TAO_Forward_FlowSpec_Entry new_entry (entry->flowname (),
02528 entry->direction_str (),
02529 entry->format (),
02530 entry->flow_protocol_str (),
02531 entry->carrier_protocol_str (),
02532 entry->address ());
02533 flow_spec[i] = CORBA::string_dup (new_entry.entry_to_string ());
02534 }
02535 else
02536 {
02537
02538 switch (forward_entry->direction ())
02539 {
02540 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02541 {
02542 ACE_NEW_RETURN (mcast_addr,
02543 ACE_INET_Addr,
02544 0);
02545 mcast_addr->set (this->mcast_port_, this->mcast_addr_.c_str ());
02546 this->mcast_port_++;
02547 char buf[BUFSIZ];
02548 mcast_addr->addr_to_string (buf, BUFSIZ);
02549 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", buf));
02550 TAO_Forward_FlowSpec_Entry *new_entry;
02551 ACE_NEW_RETURN (new_entry,
02552 TAO_Forward_FlowSpec_Entry (forward_entry->flowname (),
02553 forward_entry->direction_str (),
02554 forward_entry->format (),
02555 forward_entry->flow_protocol_str (),
02556 forward_entry->carrier_protocol_str (),
02557 mcast_addr),
02558 0);
02559 flow_spec[i] = CORBA::string_dup (new_entry->entry_to_string ());
02560
02561
02562 this->forward_flow_spec_set.insert (new_entry);
02563 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
02564 result = acceptor_registry->open (this,
02565 TAO_AV_CORE::instance (),
02566 this->forward_flow_spec_set);
02567 if (result < 0)
02568 ACE_ERROR_RETURN ((LM_ERROR, "Acceptor_Registry::open failed\n"), 0);
02569 result = this->mcast_entry_map_.bind (mcast_key, new_entry);
02570 if (result < 0)
02571 ACE_ERROR_RETURN ((LM_ERROR, "mcast_entry::bind failed"), 0);
02572 }
02573 break;
02574 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02575
02576 break;
02577 default:
02578 break;
02579 }
02580 }
02581 }
02582 }
02583 }
02584 catch (const CORBA::Exception& ex)
02585 {
02586 ex._tao_print_exception (
02587 "TAO_StreamEndPoint_A::multiconnect");
02588 return 0;
02589 }
02590 return 1;
02591 }
02592
02593
02594 CORBA::Boolean
02595 TAO_StreamEndPoint_A::connect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02596 AVStreams::streamQoS & ,
02597 const AVStreams::flowSpec & )
02598 {
02599 throw AVStreams::notSupported ();
02600 }
02601
02602
02603 void
02604 TAO_StreamEndPoint_A::disconnect_leaf (AVStreams::StreamEndPoint_B_ptr ,
02605 const AVStreams::flowSpec & )
02606
02607 {
02608
02609 throw AVStreams::notSupported ();
02610
02611 }
02612
02613 TAO_StreamEndPoint_A::~TAO_StreamEndPoint_A (void)
02614 {
02615 }
02616
02617
02618
02619
02620
02621 TAO_StreamEndPoint_B::TAO_StreamEndPoint_B (void)
02622 {
02623 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02624 "\n(%P|%t) TAO_StreamEndPoint_B::TAO_StreamEndPoint_B: created"));
02625 }
02626
02627 CORBA::Boolean
02628 TAO_StreamEndPoint_B::multiconnect (AVStreams::streamQoS &stream_qos,
02629 AVStreams::flowSpec &flow_spec)
02630 {
02631 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint_B::multiconnect\n"));
02632 try
02633 {
02634 int result = 0;
02635 TAO_AV_QoS qos (stream_qos);
02636 for (u_int i=0;i< flow_spec.length ();i++)
02637 {
02638 TAO_Forward_FlowSpec_Entry *forward_entry;
02639 ACE_NEW_RETURN (forward_entry,
02640 TAO_Forward_FlowSpec_Entry,
02641 0);
02642 forward_entry->parse (flow_spec[i]);
02643 ACE_CString mcast_key (forward_entry->flowname ());
02644 AVStreams::FlowEndPoint_var flow_endpoint;
02645 if (this->fep_map_.find (mcast_key, flow_endpoint ) == 0)
02646 {
02647 AVStreams::FlowConsumer_var consumer;
02648 try
02649 {
02650 consumer = AVStreams::FlowConsumer::_narrow (flow_endpoint.in ());
02651 }
02652 catch (const CORBA::Exception& ex)
02653 {
02654 ex._tao_print_exception (
02655 "FlowConsumer::_narrow");
02656 ACE_ERROR_RETURN ((LM_ERROR, "sep_b doesn't contain a flowconsumer"), 0);
02657 }
02658 AVStreams::QoS flow_qos;
02659 result = qos.get_flow_qos (forward_entry->flowname (), flow_qos);
02660 if (result < 0)
02661 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "QoS not found for %s", forward_entry->flowname ()));
02662 AVStreams::FlowConnection_var flow_connection;
02663 try
02664 {
02665 if (CORBA::is_nil (this->streamctrl_.in ()))
02666 {
02667 CORBA::Any_var streamctrl_any;
02668 streamctrl_any = this->get_property_value ("Related_StreamCtrl");
02669 AVStreams::StreamCtrl_ptr streamctrl;
02670 streamctrl_any.in () >>= streamctrl;
02671 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (streamctrl);
02672 }
02673 CORBA::Object_var flow_connection_obj =
02674 this->streamctrl_->get_flow_connection (forward_entry->flowname ());
02675 flow_connection = AVStreams::FlowConnection::_narrow (flow_connection_obj.in ());
02676 }
02677 catch (const CORBA::Exception& ex)
02678 {
02679 ex._tao_print_exception (
02680 "TAO_StreamEndPoint_B::multiconnect::get_flow_connection");
02681 return 0;
02682 }
02683 result = flow_connection->add_consumer (consumer.in (),
02684 flow_qos);
02685 if (result == 0)
02686 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect:add_consumer failed\n"), 0);
02687 }
02688 else
02689 {
02690 TAO_FlowSpec_Entry *mcast_entry = 0;
02691 ACE_INET_Addr *mcast_addr;
02692 mcast_addr = dynamic_cast<ACE_INET_Addr *> (forward_entry->address ());
02693 if (mcast_addr == 0)
02694 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::Address missing in flowspec_entry\n"), 0);
02695 result = this->mcast_entry_map_.find (mcast_key, mcast_entry);
02696 if (result == 0)
02697 {
02698 ACE_ERROR_RETURN ((LM_ERROR, "TAO_StreamEndPoint_B::multiconnect::handler already found\n"), 0);
02699 }
02700 else
02701 {
02702 switch (forward_entry->direction ())
02703 {
02704 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
02705 {
02706
02707
02708
02709
02710
02711
02712 this->forward_flow_spec_set.insert (forward_entry);
02713 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
02714 result = connector_registry->open (this,
02715 TAO_AV_CORE::instance (),
02716 this->forward_flow_spec_set);
02717 if (result < 0)
02718 ACE_ERROR_RETURN ((LM_ERROR, "connector_registry::open failed\n"), 0);
02719 result = this->mcast_entry_map_.bind (mcast_key, forward_entry);
02720 if (result < 0)
02721 ACE_ERROR_RETURN ((LM_ERROR, "dgram_mcast_handler::bind failed"), 0);
02722 }
02723 break;
02724 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
02725
02726 break;
02727 default:
02728 break;
02729 }
02730 }
02731 }
02732 }
02733 }
02734 catch (const CORBA::Exception& ex)
02735 {
02736 ex._tao_print_exception (
02737 "TAO_StreamEndPoint_B::multiconnect");
02738 return 0;
02739 }
02740 return 1;
02741 }
02742
02743 TAO_StreamEndPoint_B::~TAO_StreamEndPoint_B (void)
02744 {
02745 }
02746
02747
02748
02749
02750
02751 TAO_VDev::TAO_VDev (void)
02752 {
02753 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
02754 "(%P|%t) TAO_VDev::TAO_VDev: created\n"));
02755 }
02756
02757
02758
02759 CORBA::Boolean
02760 TAO_VDev::set_peer (AVStreams::StreamCtrl_ptr the_ctrl,
02761 AVStreams::VDev_ptr the_peer_dev,
02762 AVStreams::streamQoS &the_qos,
02763 const AVStreams::flowSpec &the_spec)
02764 {
02765 ACE_UNUSED_ARG (the_qos);
02766 ACE_UNUSED_ARG (the_spec);
02767
02768 CORBA::Boolean result = 0;
02769 try
02770 {
02771 if (TAO_debug_level > 0)
02772 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TAO_VDev::set_peer: called\n"));
02773
02774
02775 CORBA::Any anyval;
02776 anyval <<= the_peer_dev;
02777 this->define_property ("Related_VDev",
02778 anyval);
02779
02780
02781 this->streamctrl_ = AVStreams::StreamCtrl::_duplicate (the_ctrl);
02782 this->peer_ = AVStreams::VDev::_duplicate (the_peer_dev);
02783
02784 CORBA::Any_var anyptr;
02785 anyptr = this->peer_->get_property_value ("Related_MediaCtrl");
02786
02787 CORBA::Object_ptr media_ctrl_obj = 0;
02788
02789 anyptr.in () >>= CORBA::Any::to_object(media_ctrl_obj);
02790
02791 result = this->set_media_ctrl (media_ctrl_obj);
02792 }
02793 catch (const CORBA::Exception& ex)
02794 {
02795 ex._tao_print_exception ("TAO_VDev::set_peer");
02796 return 0;
02797 }
02798 return result;
02799 }
02800
02801 CORBA::Boolean
02802 TAO_VDev::set_media_ctrl (CORBA::Object_ptr media_ctrl)
02803
02804 {
02805
02806
02807 CORBA::release( media_ctrl);
02808
02809 return 1;
02810 }
02811
02812
02813 CORBA::Boolean
02814 TAO_VDev::set_Mcast_peer (AVStreams::StreamCtrl_ptr ,
02815 AVStreams::MCastConfigIf_ptr mcast_peer,
02816 AVStreams::streamQoS &,
02817 const AVStreams::flowSpec &)
02818 {
02819 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
02820 return 1;
02821 }
02822
02823
02824 void
02825 TAO_VDev::configure (const CosPropertyService::Property &)
02826 {
02827 }
02828
02829
02830 void
02831 TAO_VDev::set_format (const char *flowName,
02832 const char *format_name)
02833 {
02834 try
02835 {
02836 if (flowName == 0 || format_name == 0)
02837 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_format: flowName or format_name is null\n"));
02838 char format_property [BUFSIZ];
02839 ACE_OS::sprintf (format_property, "%s_currFormat", flowName);
02840 CORBA::Any format;
02841 format <<= format_name;
02842 this->define_property (format_property,
02843 format);
02844 }
02845 catch (const CORBA::Exception& ex)
02846 {
02847 ex._tao_print_exception ("TAO_VDev::set_format");
02848 return;
02849 }
02850 return;
02851 }
02852
02853
02854 void
02855 TAO_VDev::set_dev_params (const char *flowName,
02856 const CosPropertyService::Properties &new_params)
02857 {
02858 try
02859 {
02860 if (flowName == 0)
02861 ACE_ERROR ((LM_ERROR, "TAO_VDev::set_dev_params:flowName is null\n"));
02862 char devParams_property[BUFSIZ];
02863 ACE_OS::sprintf (devParams_property, "%s_devParams", flowName);
02864 CORBA::Any devParams;
02865 devParams <<= new_params;
02866 this->define_property (devParams_property,
02867 devParams);
02868 }
02869 catch (const CORBA::Exception& ex)
02870 {
02871 ex._tao_print_exception ("TAO_VDev::set_dev_params");
02872 return;
02873 }
02874 return;
02875 }
02876
02877
02878 CORBA::Boolean
02879 TAO_VDev::modify_QoS (AVStreams::streamQoS &the_qos,
02880 const AVStreams::flowSpec &flowspec)
02881 {
02882 if (TAO_debug_level > 0)
02883 ACE_DEBUG ((LM_DEBUG,
02884 "TAO_VDev::modify_QoS\n"));
02885
02886 if (flowspec.length () != 0)
02887 {
02888 TAO_Forward_FlowSpec_Entry entry;
02889 entry.parse (flowspec [0]);
02890 int direction = entry.direction ();
02891 if (direction == 0)
02892 {
02893 AVStreams::StreamEndPoint_A_ptr sep_a;
02894
02895 CORBA::Any_ptr streamendpoint_a_any =
02896 this->get_property_value ("Related_StreamEndpoint");
02897
02898 *streamendpoint_a_any >>= sep_a;
02899 if (sep_a != 0)
02900 {
02901 sep_a->modify_QoS (the_qos, flowspec);
02902 }
02903 else ACE_DEBUG ((LM_DEBUG,
02904 "Stream EndPoint Not Found\n"));
02905 }
02906 else
02907 {
02908 AVStreams::StreamEndPoint_B_ptr sep_b;
02909
02910 CORBA::Any_ptr streamendpoint_b_any =
02911 this->get_property_value ("Related_StreamEndpoint");
02912 *streamendpoint_b_any >>= sep_b;
02913 sep_b->modify_QoS (the_qos, flowspec);
02914 }
02915 }
02916 return 1;
02917 }
02918
02919 TAO_VDev::~TAO_VDev (void)
02920 {
02921 }
02922
02923
02924
02925
02926
02927
02928 TAO_MMDevice::TAO_MMDevice (TAO_AV_Endpoint_Strategy *endpoint_strategy)
02929 : endpoint_strategy_ (endpoint_strategy),
02930 flow_count_ (0),
02931 flow_num_ (0),
02932 stream_ctrl_ (0)
02933 {
02934 }
02935
02936
02937
02938 AVStreams::StreamCtrl_ptr
02939 TAO_MMDevice::bind (AVStreams::MMDevice_ptr peer_device,
02940 AVStreams::streamQoS &the_qos,
02941 CORBA::Boolean_out is_met,
02942 const AVStreams::flowSpec &the_spec)
02943 {
02944 AVStreams::StreamCtrl_ptr streamctrl (AVStreams::StreamCtrl::_nil ());
02945 try
02946 {
02947 ACE_UNUSED_ARG (is_met);
02948 ACE_NEW_RETURN (this->stream_ctrl_,
02949 TAO_StreamCtrl,
02950 0);
02951 AVStreams::MMDevice_var mmdevice = this->_this ();
02952 this->stream_ctrl_->bind_devs (peer_device,
02953 mmdevice.in (),
02954 the_qos,
02955 the_spec);
02956 streamctrl = this->stream_ctrl_->_this ();
02957 }
02958 catch (const CORBA::Exception& ex)
02959 {
02960 ex._tao_print_exception ("TAO_MMDevice::bind");
02961 return streamctrl;
02962 }
02963 return streamctrl;
02964 }
02965
02966
02967 AVStreams::StreamCtrl_ptr
02968 TAO_MMDevice::bind_mcast (AVStreams::MMDevice_ptr first_peer,
02969 AVStreams::streamQoS &the_qos,
02970 CORBA::Boolean_out is_met,
02971 const AVStreams::flowSpec &the_spec)
02972 {
02973 ACE_UNUSED_ARG (first_peer);
02974 ACE_UNUSED_ARG (the_qos);
02975 ACE_UNUSED_ARG (is_met);
02976 ACE_UNUSED_ARG (the_spec);
02977
02978 return 0;
02979 }
02980
02981 AVStreams::StreamEndPoint_ptr
02982 TAO_MMDevice::create_A_B (MMDevice_Type type,
02983 AVStreams::StreamCtrl_ptr streamctrl,
02984 AVStreams::VDev_out the_vdev,
02985 AVStreams::streamQoS &the_qos,
02986 CORBA::Boolean_out met_qos,
02987 char *&,
02988 const AVStreams::flowSpec &flow_spec)
02989 {
02990 AVStreams::StreamEndPoint_A_ptr sep_a (AVStreams::StreamEndPoint_A::_nil ());
02991 AVStreams::StreamEndPoint_B_ptr sep_b (AVStreams::StreamEndPoint_B::_nil ());
02992 AVStreams::StreamEndPoint_ptr sep (AVStreams::StreamEndPoint::_nil ());
02993 try
02994 {
02995 switch (type)
02996 {
02997 case MMDEVICE_A:
02998 {
02999 if (this->endpoint_strategy_->create_A (sep_a,
03000 the_vdev) == -1)
03001 ACE_ERROR_RETURN ((LM_ERROR,
03002 "TAO_MMDevice::create_A_B (%P|%t) - "
03003 "error in create_A\n"),
03004 0);
03005 sep = sep_a;
03006 }
03007 break;
03008 case MMDEVICE_B:
03009 {
03010 if (this->endpoint_strategy_->create_B (sep_b,
03011 the_vdev) == -1)
03012 ACE_ERROR_RETURN ((LM_ERROR,
03013 "TAO_MMDevice::create_A_B (%P|%t) - "
03014 "error in create_B\n"),
03015 0);
03016 sep = sep_b;
03017 }
03018 break;
03019 default:
03020 break;
03021 }
03022 if (this->fdev_map_.current_size () > 0)
03023 {
03024 TAO_AV_QoS qos (the_qos);
03025
03026 for (u_int i=0;i<flow_spec.length ();i++)
03027 {
03028 TAO_Forward_FlowSpec_Entry forward_entry;
03029 forward_entry.parse (flow_spec[i]);
03030 ACE_CString flow_key (forward_entry.flowname ());
03031 AVStreams::FDev_var flow_dev;
03032 AVStreams::FlowConnection_var flowconnection;
03033 try
03034 {
03035
03036
03037 CORBA::Object_var flowconnection_obj =
03038 streamctrl->get_flow_connection (forward_entry.flowname ());
03039 ACE_OS::printf("successfully called get_flow_connection\n");
03040 if (!CORBA::is_nil (flowconnection_obj.in ()))
03041 {
03042 flowconnection = AVStreams::FlowConnection::_narrow (flowconnection_obj.in ());
03043 }
03044 }
03045 catch (const AVStreams::noSuchFlow&)
03046 {
03047 TAO_FlowConnection *flowConnection;
03048 ACE_NEW_RETURN (flowConnection,
03049 TAO_FlowConnection,
03050 0);
03051 flowconnection = flowConnection->_this ();
03052 streamctrl->set_flow_connection (forward_entry.flowname(),
03053 flowconnection.in ());
03054 }
03055 catch (const CORBA::Exception& ex)
03056 {
03057
03058 ex._tao_print_exception (
03059 "TAO_MMDevice::create_a::get_flow_connection");
03060 }
03061
03062 int result = this->fdev_map_.find (flow_key, flow_dev);
03063 if (result < 0)
03064 ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) fdev_map::find failed\n"), 0);
03065
03066 CORBA::String_var named_fdev;
03067 AVStreams::FlowEndPoint_var flow_endpoint;
03068 AVStreams::QoS flow_qos;
03069 result = qos.get_flow_qos (forward_entry.flowname (), flow_qos);
03070 if (result < 0)
03071 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "(%N,%l) get_flow_qos failed for %s\n", forward_entry.flowname ()));
03072 switch (forward_entry.direction ())
03073 {
03074 case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
03075 {
03076 switch (type)
03077 {
03078 case MMDEVICE_A:
03079 {
03080
03081
03082
03083 flow_endpoint =
03084 flow_dev->create_producer (flowconnection.in (),
03085 flow_qos,
03086 met_qos,
03087 named_fdev.inout ());
03088 }
03089 break;
03090 case MMDEVICE_B:
03091 {
03092 flow_endpoint =
03093 flow_dev->create_consumer (flowconnection.in (),
03094 flow_qos,
03095 met_qos,
03096 named_fdev.inout ());
03097 }
03098 break;
03099 }
03100 }
03101 break;
03102 case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
03103 {
03104 switch (type)
03105 {
03106 case MMDEVICE_A:
03107 {
03108
03109
03110
03111 flow_endpoint =
03112 flow_dev->create_consumer (flowconnection.in (),
03113 flow_qos,
03114 met_qos,
03115 named_fdev.inout ());
03116 }
03117 break;
03118 case MMDEVICE_B:
03119 {
03120
03121
03122
03123 flow_endpoint =
03124 flow_dev->create_producer (flowconnection.in (),
03125 flow_qos,
03126 met_qos,
03127 named_fdev.inout ());
03128 }
03129 break;
03130 }
03131 }
03132 break;
03133 default:
03134 break;
03135 }
03136 CORBA::Any flowname_any;
03137 flowname_any <<= forward_entry.flowname ();
03138 flow_endpoint->define_property ("FlowName", flowname_any);
03139 sep->add_fep (flow_endpoint.in ());
03140 }
03141 }
03142 }
03143 catch (const CORBA::Exception& ex)
03144 {
03145 ex._tao_print_exception ("TAO_MMDevice::create_A");
03146 return sep;
03147 }
03148 return sep;
03149 }
03150
03151 AVStreams::StreamEndPoint_A_ptr
03152 TAO_MMDevice::create_A (AVStreams::StreamCtrl_ptr streamctrl,
03153 AVStreams::VDev_out the_vdev,
03154 AVStreams::streamQoS &stream_qos,
03155 CORBA::Boolean_out met_qos,
03156 char *&named_vdev,
03157 const AVStreams::flowSpec &flow_spec)
03158 {
03159 AVStreams::StreamEndPoint_A_ptr sep_a = 0;
03160 AVStreams::StreamEndPoint_var sep;
03161 try
03162 {
03163 sep = this->create_A_B (MMDEVICE_A, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03164 sep_a = AVStreams::StreamEndPoint_A::_narrow (sep.in());
03165
03166 ACE_ASSERT( !CORBA::is_nil( sep_a ) );
03167 }
03168 catch (const CORBA::Exception& ex)
03169 {
03170 ex._tao_print_exception ("TAO_MMDevice::create_A");
03171 return sep_a;
03172 }
03173
03174 return sep_a;
03175 }
03176
03177
03178 AVStreams::StreamEndPoint_B_ptr
03179 TAO_MMDevice::create_B (AVStreams::StreamCtrl_ptr streamctrl,
03180 AVStreams::VDev_out the_vdev,
03181 AVStreams::streamQoS &stream_qos,
03182 CORBA::Boolean_out met_qos,
03183 char *&named_vdev,
03184 const AVStreams::flowSpec &flow_spec)
03185 {
03186 AVStreams::StreamEndPoint_B_ptr sep_b = AVStreams::StreamEndPoint_B::_nil ();
03187 AVStreams::StreamEndPoint_var sep;
03188
03189 try
03190 {
03191 sep = this->create_A_B (MMDEVICE_B, streamctrl, the_vdev, stream_qos, met_qos, named_vdev, flow_spec);
03192 sep_b = AVStreams::StreamEndPoint_B::_narrow (sep.in());
03193
03194 ACE_ASSERT ( !CORBA::is_nil( sep_b ) );
03195 }
03196 catch (const CORBA::Exception& ex)
03197 {
03198 ex._tao_print_exception ("TAO_MMDevice::create_B");
03199 return sep_b;
03200 }
03201 return sep_b;
03202 }
03203
03204
03205
03206 void
03207 TAO_MMDevice::destroy (AVStreams::StreamEndPoint_ptr ,
03208 const char * )
03209 {
03210
03211
03212
03213 int result = TAO_AV_Core::deactivate_servant (this);
03214 if (result < 0)
03215 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_MMDevice::destroy failed\n"));
03216 }
03217
03218 char *
03219 TAO_MMDevice::add_fdev_i (AVStreams::FDev_ptr fdev)
03220 {
03221 char* tmp;
03222 ACE_NEW_RETURN (tmp,
03223 char[64],
03224 0);
03225 CORBA::String_var flow_name = tmp;
03226
03227 try
03228 {
03229
03230
03231 ACE_OS::sprintf (tmp, "flow%d", flow_num_++);
03232 CORBA::Any flowname_any;
03233 flowname_any <<= flow_name.in ();
03234 fdev->define_property ("Flow", flowname_any);
03235 }
03236 catch (const CORBA::Exception& ex)
03237 {
03238 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03239 return 0;
03240 }
03241 return flow_name._retn ();
03242 }
03243
03244
03245 char *
03246 TAO_MMDevice::add_fdev (CORBA::Object_ptr fdev_obj)
03247 {
03248 CORBA::String_var flow_name;
03249 AVStreams::FDev_var fdev;
03250 try
03251 {
03252 CORBA::Any_ptr flow_name_any;
03253 fdev = AVStreams::FDev::_narrow (fdev_obj);
03254
03255 if (CORBA::is_nil (fdev.in ()))
03256 return 0;
03257
03258
03259 flow_name_any = fdev->get_property_value ("Flow");
03260
03261 const char *tmp;
03262 *flow_name_any >>= tmp;
03263 flow_name = CORBA::string_dup (tmp);
03264 }
03265 catch (const CORBA::Exception&)
03266 {
03267 flow_name =
03268 this->add_fdev_i (fdev.in ());
03269 }
03270
03271
03272
03273
03274 ACE_CString fdev_name_key ( flow_name.in () );
03275
03276
03277 if ( (this->fdev_map_.bind (fdev_name_key, fdev )) != 0)
03278 throw AVStreams::streamOpFailed ();
03279
03280 this->flow_count_++;
03281 this->flows_.length (this->flow_count_);
03282 this->flows_ [this->flow_count_-1] = flow_name;
03283
03284 CORBA::Any flows_any;
03285 flows_any <<= this->flows_;
03286 try
03287 {
03288 this->define_property ("Flows",
03289 flows_any);
03290 }
03291 catch (const CORBA::Exception& ex)
03292 {
03293 ex._tao_print_exception ("TAO_MMDevice::add_fdev");
03294 return 0;
03295 }
03296 return flow_name._retn ();
03297 }
03298
03299
03300 CORBA::Object_ptr
03301 TAO_MMDevice::get_fdev (const char *flow_name)
03302 {
03303
03304 ACE_CString fdev_name_key (flow_name);
03305 AVStreams::FDev_var fdev_entry;
03306 if (this->fdev_map_.find (fdev_name_key, fdev_entry) == 0)
03307 return fdev_entry._retn() ;
03308 return 0;
03309 }
03310
03311
03312 void
03313 TAO_MMDevice::remove_fdev (const char *flow_name)
03314 {
03315 try
03316 {
03317 ACE_CString fdev_name_key (flow_name);
03318 AVStreams::FDev_var fdev_entry;
03319
03320 if (this->fdev_map_.unbind (fdev_name_key, fdev_entry)!= 0)
03321 throw AVStreams::streamOpFailed ();
03322
03323 AVStreams::flowSpec new_flows (this->flows_.length ());
03324 for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
03325 if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
03326 new_flows[j++] = this->flows_[i];
03327
03328 CORBA::Any flows;
03329 flows <<= new_flows;
03330 this->flows_ = new_flows;
03331 this->define_property ("Flows",
03332 flows);
03333 }
03334 catch (const CORBA::Exception& ex)
03335 {
03336 ex._tao_print_exception ("TAO_MMDevice::remove_fdev");
03337 }
03338 }
03339
03340
03341 TAO_MMDevice::~TAO_MMDevice (void)
03342 {
03343 delete this->stream_ctrl_;
03344 }
03345
03346
03347
03348
03349
03350
03351 TAO_FlowConnection::TAO_FlowConnection (void)
03352 :fp_name_ (CORBA::string_dup ("")),
03353 ip_multicast_ (0)
03354 {
03355 }
03356
03357
03358
03359
03360
03361
03362
03363
03364
03365 int
03366 TAO_FlowConnection::set_mcast_addr (ACE_CString mcast_addr, u_short mcast_port)
03367 {
03368 this->mcast_addr_ = mcast_addr;
03369 this->mcast_port_ = mcast_port;
03370 return 0;
03371 }
03372
03373 void
03374 TAO_FlowConnection::set_protocol (const char *protocol)
03375 {
03376 this->protocol_ = protocol;
03377 }
03378
03379
03380 void
03381 TAO_FlowConnection::stop (void)
03382 {
03383 try
03384 {
03385 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03386 ();
03387 for (FlowProducer_SetItor producer_end =
03388 this->flow_producer_set_.end ();
03389 producer_begin != producer_end; ++producer_begin)
03390 {
03391 (*producer_begin)->stop ();
03392 }
03393 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03394 ();
03395 for (FlowConsumer_SetItor consumer_end =
03396 this->flow_consumer_set_.end ();
03397 consumer_begin != consumer_end; ++consumer_begin)
03398 {
03399 (*consumer_begin)->stop ();
03400 }
03401 }
03402 catch(AVStreams::noSuchFlow& ex)
03403 {
03404 throw;
03405 }
03406 catch (const CORBA::Exception& ex)
03407 {
03408 ex._tao_print_exception ("TAO_FlowConnection::stop");
03409 throw;
03410 }
03411 catch(...)
03412 {
03413 printf ("TAO_FlowConnection::stop - unknown exception\n");
03414 }
03415 }
03416
03417
03418 void
03419 TAO_FlowConnection::start (void)
03420 {
03421 try
03422 {
03423 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03424 ();
03425 for (FlowConsumer_SetItor consumer_end =
03426 this->flow_consumer_set_.end ();
03427 consumer_begin != consumer_end; ++consumer_begin)
03428 {
03429 (*consumer_begin)->start ();
03430 }
03431 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03432 ();
03433 for (FlowProducer_SetItor producer_end =
03434 this->flow_producer_set_.end ();
03435 producer_begin != producer_end; ++producer_begin)
03436 {
03437 (*producer_begin)->start ();
03438 }
03439 }
03440 catch(AVStreams::noSuchFlow& ex)
03441 {
03442 throw;
03443 }
03444 catch (const CORBA::Exception& ex)
03445 {
03446 ex._tao_print_exception ("TAO_FlowConnection::start");
03447 throw;
03448 }
03449 catch(...)
03450 {
03451 printf ("TAO_FlowConnection::start - unknown exception\n");
03452 }
03453 }
03454
03455
03456 void
03457 TAO_FlowConnection::destroy (void)
03458 {
03459 try
03460 {
03461 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03462 ();
03463 for (FlowProducer_SetItor producer_end =
03464 this->flow_producer_set_.end ();
03465 producer_begin != producer_end; ++producer_begin)
03466 {
03467 (*producer_begin)->destroy ();
03468 }
03469 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03470 ();
03471 for (FlowConsumer_SetItor consumer_end =
03472 this->flow_consumer_set_.end ();
03473 consumer_begin != consumer_end; ++consumer_begin)
03474 {
03475 (*consumer_begin)->destroy ();
03476 }
03477 }
03478 catch (const CORBA::Exception& ex)
03479 {
03480 ex._tao_print_exception ("TAO_FlowConnection::destroy");
03481 return;
03482 }
03483 int result = TAO_AV_Core::deactivate_servant (this);
03484 if (result < 0)
03485 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03486 }
03487
03488
03489 CORBA::Boolean
03490 TAO_FlowConnection::modify_QoS (AVStreams::QoS & new_qos)
03491 {
03492 ACE_UNUSED_ARG (new_qos);
03493 return 0;
03494 }
03495
03496
03497 CORBA::Boolean
03498 TAO_FlowConnection::use_flow_protocol (const char * fp_name,
03499 const CORBA::Any & fp_settings)
03500 {
03501 this->fp_name_ = fp_name;
03502 this->fp_settings_ = fp_settings;
03503 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03504 ();
03505 for (FlowProducer_SetItor producer_end =
03506 this->flow_producer_set_.end ();
03507 producer_begin != producer_end; ++producer_begin)
03508 {
03509 (*producer_begin)->use_flow_protocol
03510 (fp_name, fp_settings);
03511 }
03512 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03513 ();
03514 for (FlowConsumer_SetItor consumer_end =
03515 this->flow_consumer_set_.end ();
03516 consumer_begin != consumer_end; ++consumer_begin)
03517 {
03518 (*consumer_begin)->use_flow_protocol
03519 (fp_name, fp_settings);
03520 }
03521 return 1;
03522 }
03523
03524 void
03525 TAO_FlowConnection::push_event (const AVStreams::streamEvent & the_event)
03526 {
03527 ACE_UNUSED_ARG (the_event);
03528 }
03529
03530 CORBA::Boolean
03531 TAO_FlowConnection::connect_devs (AVStreams::FDev_ptr a_party,
03532 AVStreams::FDev_ptr b_party,
03533 AVStreams::QoS & flow_qos)
03534 {
03535 CORBA::Boolean result = 0;
03536 try
03537 {
03538 AVStreams::FlowConnection_var flowconnection = this->_this ();
03539 CORBA::Boolean met_qos;
03540 CORBA::String_var named_fdev ((const char *)"");
03541 AVStreams::FlowProducer_var producer =
03542 a_party->create_producer (flowconnection.in (),
03543 flow_qos,
03544 met_qos,
03545 named_fdev.inout ());
03546 AVStreams::FlowConsumer_var consumer =
03547 b_party->create_consumer (flowconnection.in (),
03548 flow_qos,
03549 met_qos,
03550 named_fdev.inout ());
03551 result = this->connect (producer.in (),
03552 consumer.in (),
03553 flow_qos);
03554 }
03555 catch (const CORBA::Exception& ex)
03556 {
03557 ex._tao_print_exception (
03558 "TAO_FlowConnection::connect_devs");
03559 return 0;
03560 }
03561 return result;
03562 }
03563
03564
03565 CORBA::Boolean
03566 TAO_FlowConnection::connect (AVStreams::FlowProducer_ptr producer,
03567 AVStreams::FlowConsumer_ptr consumer,
03568 AVStreams::QoS & the_qos)
03569 {
03570 try
03571 {
03572
03573 AVStreams::FlowProducer_ptr flow_producer =
03574 AVStreams::FlowProducer::_duplicate (producer);
03575 AVStreams::FlowConsumer_ptr flow_consumer =
03576 AVStreams::FlowConsumer::_duplicate (consumer);
03577
03578 this->flow_producer_set_.insert (flow_producer);
03579 this->flow_consumer_set_.insert (flow_consumer);
03580 AVStreams::FlowConnection_var flowconnection =
03581 this->_this ();
03582
03583 flow_producer->set_peer (flowconnection.in (),
03584 flow_consumer,
03585 the_qos);
03586
03587 flow_consumer->set_peer (flowconnection.in (),
03588 flow_producer,
03589 the_qos);
03590
03591 char *consumer_address =
03592 flow_consumer->go_to_listen (the_qos,
03593 0,
03594 flow_producer,
03595 this->fp_name_.inout ());
03596
03597 if (ACE_OS::strcmp (consumer_address, "") == 0)
03598 {
03599
03600 consumer_address = flow_producer->go_to_listen (the_qos,
03601 0,
03602 flow_consumer,
03603 this->fp_name_.inout ());
03604 flow_consumer->connect_to_peer (the_qos,
03605 consumer_address,
03606 this->fp_name_.inout ());
03607
03608
03609 }
03610 else
03611 {
03612 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03613 flow_producer->connect_to_peer (the_qos,
03614 consumer_address,
03615 this->fp_name_.inout ());
03616 }
03617 }
03618 catch (const CORBA::Exception& ex)
03619 {
03620 ex._tao_print_exception ("TAO_FlowConnection::connect");
03621 }
03622 return 1;
03623 }
03624
03625
03626 CORBA::Boolean
03627 TAO_FlowConnection::disconnect (void)
03628 {
03629 return 0;
03630 }
03631
03632 CORBA::Boolean
03633 TAO_FlowConnection::add_producer (AVStreams::FlowProducer_ptr producer,
03634 AVStreams::QoS & the_qos)
03635 {
03636 try
03637 {
03638 AVStreams::FlowProducer_ptr flow_producer =
03639 AVStreams::FlowProducer::_duplicate (producer);
03640
03641
03642
03643
03644
03645 FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03646 FlowProducer_SetItor end = this->flow_producer_set_.end ();
03647 for (; begin != end; ++begin)
03648 {
03649 if ((*begin)->_is_equivalent (producer))
03650
03651 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03652 }
03653
03654
03655
03656
03657 int result = this->flow_producer_set_.insert (flow_producer);
03658 if (result == 1)
03659 {
03660
03661 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03662 }
03663 CORBA::Boolean met_qos;
03664 char mcast_address[BUFSIZ];
03665 if (this->producer_address_.in () == 0)
03666 {
03667 ACE_INET_Addr mcast_addr;
03668 mcast_addr.set (this->mcast_port_,
03669 this->mcast_addr_.c_str ()
03670 );
03671
03672 char buf [BUFSIZ];
03673 mcast_addr.addr_to_string (buf, BUFSIZ);
03674 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03675 }
03676 else
03677 {
03678 ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03679 }
03680 char *address = flow_producer->connect_mcast (the_qos,
03681 met_qos,
03682 mcast_address,
03683 this->fp_name_.in ());
03684
03685 if (this->producer_address_.in () == 0)
03686 {
03687 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03688 if (entry.address () != 0)
03689 {
03690
03691 this->producer_address_ = address;
03692 }
03693 else
03694 {
03695
03696 this->ip_multicast_ = 0;
03697 }
03698 }
03699
03700 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03701 {
03702 ACE_NEW_RETURN (this->mcastconfigif_i_,
03703 TAO_MCastConfigIf,
03704 0);
03705 this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03706 }
03707 AVStreams::FlowConnection_var flowconnection = this->_this ();
03708 flow_producer->set_Mcast_peer (flowconnection.in (),
03709 this->mcastconfigif_.in (),
03710 the_qos);
03711 }
03712 catch (const CORBA::Exception& ex)
03713 {
03714 ex._tao_print_exception (
03715 "TAO_FlowConnection::add_producer");
03716 return 0;
03717 }
03718 return 1;
03719 }
03720
03721 CORBA::Boolean
03722 TAO_FlowConnection::add_consumer (AVStreams::FlowConsumer_ptr consumer,
03723 AVStreams::QoS & the_qos)
03724 {
03725 try
03726 {
03727 AVStreams::FlowConsumer_ptr flow_consumer =
03728 AVStreams::FlowConsumer::_duplicate (consumer);
03729 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03730 FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03731 for (; begin != end; ++begin)
03732 {
03733 if ((*begin)->_is_equivalent (consumer))
03734
03735 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03736 }
03737 int result = this->flow_consumer_set_.insert (flow_consumer);
03738 if (result == 1)
03739 {
03740
03741 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03742 }
03743
03744 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03745
03746
03747
03748
03749 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03750
03751 AVStreams::protocolSpec protocols (1);
03752 protocols.length (1);
03753 protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03754
03755 if (!this->ip_multicast_)
03756 {
03757 flow_consumer->set_protocol_restriction (protocols);
03758 char * address =
03759 flow_consumer->go_to_listen (the_qos,
03760 1,
03761 flow_producer,
03762 this->fp_name_.inout ());
03763 CORBA::Boolean is_met;
03764 flow_producer->connect_mcast (the_qos,
03765 is_met,
03766 address,
03767 this->fp_name_.inout ());
03768 }
03769 else
03770 {
03771
03772
03773
03774
03775
03776 flow_consumer->connect_to_peer (the_qos,
03777 this->producer_address_.in (),
03778 this->fp_name_.inout ());
03779
03780
03781
03782
03783
03784
03785
03786
03787 }
03788 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03789 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03790
03791 AVStreams::flowSpec flow_spec;
03792 AVStreams::streamQoS stream_qos (1);
03793 stream_qos.length (1);
03794 stream_qos [0] = the_qos;
03795 this->mcastconfigif_->set_peer (flow_consumer,
03796 stream_qos,
03797 flow_spec);
03798 }
03799 catch (const CORBA::Exception& ex)
03800 {
03801 ex._tao_print_exception (
03802 "TAO_FlowConnection::add_consumer");
03803 return 0;
03804 }
03805 return 1;
03806 }
03807
03808 CORBA::Boolean
03809 TAO_FlowConnection::drop (AVStreams::FlowEndPoint_ptr target)
03810 {
03811 ACE_UNUSED_ARG (target);
03812 return 0;
03813 }
03814
03815
03816
03817
03818
03819
03820 TAO_FlowEndPoint::TAO_FlowEndPoint (void)
03821 :lock_ (0)
03822 {
03823 }
03824
03825 TAO_FlowEndPoint::TAO_FlowEndPoint (const char *flowname,
03826 AVStreams::protocolSpec &protocols,
03827 const char *format)
03828 {
03829 this->open (flowname, protocols, format);
03830 }
03831
03832 void
03833 TAO_FlowEndPoint::set_flow_handler (const char * ,
03834 TAO_AV_Flow_Handler * )
03835 {
03836 }
03837
03838 int
03839 TAO_FlowEndPoint::open (const char *flowname,
03840 AVStreams::protocolSpec &protocols,
03841 const char *format)
03842 {
03843 this->flowname_ = flowname;
03844 this->format_ = format;
03845
03846 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowEndPoint::open\n"));
03847 try
03848 {
03849 CORBA::Any flowname_any;
03850 flowname_any <<= flowname;
03851 this->define_property ("FlowName",
03852 flowname_any);
03853 this->set_format (format);
03854 this->protocol_addresses_ = protocols;
03855 AVStreams::protocolSpec protocol_spec (protocols.length ());
03856 protocol_spec.length (protocols.length ());
03857 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
03858 for (u_int i=0;i<protocols.length ();i++)
03859 {
03860 CORBA::String_var address = CORBA::string_dup (protocols [i]);
03861 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address.in ());
03862 protocol_spec [i] = CORBA::string_dup (entry.carrier_protocol_str ());
03863 if (TAO_debug_level > 0)
03864 ACE_DEBUG ((LM_DEBUG,
03865 "[%s]\n",
03866 static_cast<char const*>(protocol_spec[i])));
03867 }
03868 this->set_protocol_restriction (protocol_spec);
03869 }
03870 catch (const CORBA::Exception& ex)
03871 {
03872 ex._tao_print_exception ("TAO_FlowEndPoint::open");
03873 return -1;
03874 }
03875 return 0;
03876 }
03877
03878
03879 int
03880 TAO_FlowEndPoint::set_flowname (const char *flowname)
03881 {
03882 this->flowname_ = flowname;
03883 return 0;
03884 }
03885
03886
03887
03888 CORBA::Boolean
03889 TAO_FlowEndPoint::lock (void)
03890 {
03891
03892
03893 if (this->lock_)
03894 return 0;
03895 this->lock_ = 1;
03896 return 1;
03897 }
03898
03899
03900 void
03901 TAO_FlowEndPoint::unlock (void)
03902 {
03903 this->lock_ = 0;
03904 }
03905
03906
03907 void
03908 TAO_FlowEndPoint::destroy (void)
03909 {
03910 int result = TAO_AV_Core::deactivate_servant (this);
03911 if (result < 0)
03912 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
03913 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
03914 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
03915 begin != end; ++begin)
03916 (*begin)->protocol_object ()->destroy ();
03917 }
03918
03919 AVStreams::StreamEndPoint_ptr
03920 TAO_FlowEndPoint::related_sep (void)
03921 {
03922
03923 return AVStreams::StreamEndPoint::_duplicate (this->related_sep_.in ());
03924 }
03925
03926 void
03927 TAO_FlowEndPoint::related_sep (AVStreams::StreamEndPoint_ptr related_sep)
03928 {
03929 this->related_sep_ = AVStreams::StreamEndPoint::_duplicate (related_sep);
03930 }
03931
03932 AVStreams::FlowConnection_ptr
03933 TAO_FlowEndPoint::related_flow_connection (void)
03934 {
03935 return AVStreams::FlowConnection::_duplicate (this->related_flow_connection_.in ());
03936 }
03937
03938 void
03939 TAO_FlowEndPoint::related_flow_connection (AVStreams::FlowConnection_ptr related_flow_connection)
03940 {
03941 this->related_flow_connection_ = AVStreams::FlowConnection::_duplicate (related_flow_connection);
03942 }
03943
03944
03945 AVStreams::FlowEndPoint_ptr
03946 TAO_FlowEndPoint::get_connected_fep (void)
03947 {
03948 return AVStreams::FlowEndPoint::_duplicate (this->peer_fep_.in ());
03949 }
03950
03951 CORBA::Boolean
03952 TAO_FlowEndPoint::use_flow_protocol (const char * fp_name,
03953 const CORBA::Any &)
03954 {
03955 try
03956 {
03957
03958 CORBA::Any flowname_property;
03959 flowname_property <<= fp_name;
03960 this->define_property ("FlowProtocol",
03961 flowname_property);
03962 }
03963 catch (const CORBA::Exception& ex)
03964 {
03965 ex._tao_print_exception (
03966 "TAO_FlowEndPoint::use_flow_protocol");
03967 return 0;
03968 }
03969 return 1;
03970 }
03971
03972 void
03973 TAO_FlowEndPoint::set_format (const char * format)
03974 {
03975 this->format_ = format;
03976 try
03977 {
03978
03979
03980 CORBA::Any format_val;
03981 format_val <<= format;
03982 this->define_property ("Format",
03983 format_val);
03984 }
03985 catch (const CORBA::Exception& ex)
03986 {
03987 ex._tao_print_exception ("TAO_FlowEndpoint::set_format");
03988 }
03989 }
03990
03991 void
03992 TAO_FlowEndPoint::set_dev_params (const CosPropertyService::Properties & new_settings)
03993 {
03994 this->dev_params_ = new_settings;
03995 try
03996 {
03997 CORBA::Any DevParams_property;
03998 DevParams_property <<= new_settings;
03999 this->define_property ("DevParams",
04000 DevParams_property);
04001 }
04002 catch (const CORBA::Exception& ex)
04003 {
04004 ex._tao_print_exception (
04005 "TAO_FlowEndPoint::set_dev_params");
04006 }
04007 }
04008
04009 void
04010 TAO_FlowEndPoint::set_protocol_restriction (const AVStreams::protocolSpec & protocols)
04011 {
04012 try
04013 {
04014 u_int i = 0;
04015 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04016 for (i=0;i<protocols.length ();i++)
04017 {
04018 const char *protocol = (protocols)[i];
04019 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04020 }
04021 CORBA::Any AvailableProtocols_property;
04022 AvailableProtocols_property <<= protocols;
04023 this->define_property ("AvailableProtocols",
04024 AvailableProtocols_property);
04025 AVStreams::protocolSpec *temp_spec;
04026 CORBA::Any_var temp_any = this->get_property_value ("AvailableProtocols");
04027 temp_any.in () >>= temp_spec;
04028 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%N:%l\n"));
04029 for (i=0;i<temp_spec->length ();i++)
04030 {
04031 const char *protocol = (*temp_spec)[i];
04032 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "%s\n", protocol));
04033 }
04034 this->protocols_ = protocols;
04035 }
04036 catch (const CORBA::Exception& ex)
04037 {
04038 ex._tao_print_exception (
04039 "TAO_FlowEndpoint::set_protocol_restriction");
04040 }
04041 }
04042
04043 CORBA::Boolean
04044 TAO_FlowEndPoint::is_fep_compatible (AVStreams::FlowEndPoint_ptr peer_fep)
04045 {
04046 const char *exception_message = "";
04047 try
04048 {
04049
04050
04051
04052 CORBA::Any_var format_ptr;
04053 CORBA::String_var my_format, peer_format;
04054
04055 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format";
04056 format_ptr = this->get_property_value ("Format");
04057
04058 const char *temp_format;
04059 format_ptr.in () >>= temp_format;
04060 my_format = CORBA::string_dup (temp_format);
04061
04062 exception_message = "TAO_FlowEndPoint::is_fep_compatible - Format[2]";
04063 format_ptr = peer_fep->get_property_value ("Format");
04064 format_ptr.in () >>= temp_format;
04065 peer_format = CORBA::string_dup (temp_format);
04066 if (ACE_OS::strcmp (my_format.in (),
04067 peer_format.in ()) != 0)
04068 return 0;
04069
04070
04071 CORBA::Any_var AvailableProtocols_ptr;
04072 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04073 AVStreams::protocolSpec *temp_protocols;;
04074
04075 exception_message =
04076 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols";
04077 AvailableProtocols_ptr = this->get_property_value ("AvailableProtocols");
04078 AvailableProtocols_ptr.in () >>= temp_protocols;
04079 my_protocol_spec = *temp_protocols;
04080
04081 exception_message =
04082 "TAO_FlowEndPoint::is_fep_compatible - AvailableProtocols[2]";
04083 AvailableProtocols_ptr = peer_fep->get_property_value ("AvailableProtocols");
04084 AvailableProtocols_ptr.in () >>= temp_protocols;
04085 peer_protocol_spec = *temp_protocols;
04086
04087 int protocol_match = 0;
04088 for (u_int i=0;i<my_protocol_spec.length ();i++)
04089 {
04090 CORBA::String_var my_protocol_string;
04091 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04092 {
04093 CORBA::String_var peer_protocol_string;
04094 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04095 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04096 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04097 {
04098 protocol_match = 1;
04099 break;
04100 }
04101 }
04102 if (protocol_match)
04103 break;
04104 }
04105 if (!protocol_match)
04106 return 0;
04107 }
04108 catch (const CosPropertyService::PropertyNotFound& nf)
04109 {
04110 nf._tao_print_exception (exception_message);
04111 }
04112 catch (const CORBA::Exception& ex)
04113 {
04114 ex._tao_print_exception ("TAO_FlowEndPoint::is_fep_compatible");
04115 return 0;
04116 }
04117 return 1;
04118 }
04119
04120 CORBA::Boolean
04121 TAO_FlowEndPoint::set_peer (AVStreams::FlowConnection_ptr ,
04122 AVStreams::FlowEndPoint_ptr the_peer_fep,
04123 AVStreams::QoS & )
04124 {
04125 this->peer_fep_ =
04126 AVStreams::FlowEndPoint::_duplicate (the_peer_fep);
04127 return 1;
04128 }
04129
04130 CORBA::Boolean
04131 TAO_FlowEndPoint::set_Mcast_peer (AVStreams::FlowConnection_ptr ,
04132 AVStreams::MCastConfigIf_ptr mcast_peer,
04133 AVStreams::QoS & )
04134 {
04135 this->mcast_peer_ = AVStreams::MCastConfigIf::_duplicate (mcast_peer);
04136 return 0;
04137 }
04138
04139 char *
04140 TAO_FlowEndPoint::go_to_listen_i (TAO_FlowSpec_Entry::Role role,
04141 AVStreams::QoS & ,
04142 CORBA::Boolean ,
04143 AVStreams::FlowEndPoint_ptr peer_fep,
04144 char *& flowProtocol)
04145 {
04146 char direction [BUFSIZ];
04147 switch (role)
04148 {
04149 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04150 ACE_OS::strcpy (direction, "IN");
04151 break;
04152 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04153 ACE_OS::strcpy (direction, "OUT");
04154 break;
04155 default:
04156 break;
04157 }
04158 AVStreams::protocolSpec my_protocol_spec, peer_protocol_spec;
04159 AVStreams::protocolSpec *temp_protocols;
04160 CORBA::Any_var AvailableProtocols_ptr =
04161 peer_fep->get_property_value ("AvailableProtocols");
04162 AvailableProtocols_ptr.in () >>= temp_protocols;
04163 peer_protocol_spec = *temp_protocols;
04164 AvailableProtocols_ptr =
04165 this->get_property_value ("AvailableProtocols");
04166 AvailableProtocols_ptr.in () >>= temp_protocols;
04167 my_protocol_spec = *temp_protocols;
04168 int protocol_match = 0;
04169 CORBA::String_var listen_protocol;
04170 u_int i =0;
04171 for (i=0;i<my_protocol_spec.length ();i++)
04172 {
04173 CORBA::String_var my_protocol_string;
04174 for (u_int j=0;j<peer_protocol_spec.length ();j++)
04175 {
04176 CORBA::String_var peer_protocol_string;
04177 my_protocol_string = CORBA::string_dup (my_protocol_spec[i]);
04178 peer_protocol_string = CORBA::string_dup (peer_protocol_spec[j]);
04179 if (ACE_OS::strcmp (my_protocol_string.in (), peer_protocol_string.in ()) == 0)
04180 {
04181 listen_protocol = my_protocol_string;
04182 protocol_match = 1;
04183 break;
04184 }
04185 }
04186 if (protocol_match)
04187 break;
04188 }
04189 if (!protocol_match)
04190 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::go_to_listen failed: no protoocol match\n"), 0);
04191
04192 for (u_int j=0;j<this->protocol_addresses_.length ();j++)
04193 if (ACE_OS::strncmp (this->protocol_addresses_ [j], listen_protocol.in (), ACE_OS::strlen (listen_protocol.in ())) == 0)
04194 {
04195
04196 TAO_Forward_FlowSpec_Entry *entry;
04197 ACE_NEW_RETURN (entry,
04198 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04199 direction,
04200 this->format_.in (),
04201 flowProtocol,
04202 this->protocol_addresses_ [j]),
04203 0);
04204
04205 TAO_AV_Acceptor_Registry *acceptor_registry = TAO_AV_CORE::instance ()->acceptor_registry ();
04206 this->flow_spec_set_.insert (entry);
04207 int result = acceptor_registry->open (this,
04208 TAO_AV_CORE::instance (),
04209 this->flow_spec_set_);
04210 if (result < 0)
04211 return 0;
04212 char *listen_address = entry->get_local_addr_str ();
04213 char *address;
04214 ACE_NEW_RETURN (address,
04215 char [BUFSIZ],
04216 0);
04217 ACE_OS::sprintf (address, "%s=%s", listen_protocol.in (), listen_address);
04218 return address;
04219 }
04220 return 0;
04221 }
04222
04223
04224 CORBA::Boolean
04225 TAO_FlowEndPoint::connect_to_peer_i (TAO_FlowSpec_Entry::Role role,
04226 AVStreams::QoS & ,
04227 const char * address,
04228 const char * use_flow_protocol)
04229 {
04230 char direction [BUFSIZ];
04231 switch (role)
04232 {
04233 case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
04234 ACE_OS::strcpy (direction, "IN");
04235 break;
04236 case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
04237 ACE_OS::strcpy (direction, "OUT");
04238 break;
04239 default:
04240 break;
04241 }
04242 TAO_Forward_FlowSpec_Entry *entry;
04243 ACE_NEW_RETURN (entry,
04244 TAO_Forward_FlowSpec_Entry (this->flowname_.in (),
04245 direction,
04246 this->format_.in (),
04247 use_flow_protocol,
04248 address),
04249 0);
04250 this->flow_spec_set_.insert (entry);
04251 TAO_AV_Connector_Registry *connector_registry = TAO_AV_CORE::instance ()->connector_registry ();
04252 int result = connector_registry->open (this,
04253 TAO_AV_CORE::instance (),
04254 this->flow_spec_set_);
04255 if (result < 0)
04256 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowEndPoint::connector_registry::open failed\n"), 0);
04257 this->reverse_channel_ = entry->get_local_addr_str ();
04258 return 1;
04259 }
04260
04261 int
04262 TAO_FlowEndPoint::set_protocol_object (const char * ,
04263 TAO_AV_Protocol_Object * )
04264 {
04265 return 0;
04266 }
04267
04268
04269
04270
04271
04272
04273
04274 TAO_FlowProducer::TAO_FlowProducer (void)
04275 {
04276 }
04277
04278 TAO_FlowProducer::TAO_FlowProducer (const char *flowname,
04279 AVStreams::protocolSpec protocols,
04280 const char *format)
04281 {
04282 this->open (flowname, protocols, format);
04283 }
04284
04285
04286 char *
04287 TAO_FlowProducer::get_rev_channel (const char * )
04288 {
04289 return 0;
04290 }
04291
04292
04293 void
04294 TAO_FlowProducer::stop (void)
04295 {
04296 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04297 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04298 begin != end; ++begin)
04299 {
04300 TAO_FlowSpec_Entry *entry = (*begin);
04301 entry->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04302 }
04303 }
04304
04305 void
04306 TAO_FlowProducer::start (void)
04307 {
04308 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04309 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04310 begin != end; ++begin)
04311 {
04312 TAO_FlowSpec_Entry *entry = (*begin);
04313 if (entry->handler () != 0)
04314 {
04315 entry->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04316 }
04317 if (entry->control_handler () != 0)
04318 {
04319 entry->control_handler ()->start (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
04320 }
04321 }
04322 }
04323
04324 char *
04325 TAO_FlowProducer::go_to_listen (AVStreams::QoS & the_qos,
04326 CORBA::Boolean is_mcast,
04327 AVStreams::FlowEndPoint_ptr peer_fep,
04328 char *& flowProtocol)
04329 {
04330 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04331 the_qos,
04332 is_mcast,
04333 peer_fep,
04334 flowProtocol);
04335 }
04336
04337 CORBA::Boolean
04338 TAO_FlowProducer::connect_to_peer (AVStreams::QoS & the_qos,
04339 const char * address,
04340 const char * use_flow_protocol)
04341 {
04342 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_PRODUCER,
04343 the_qos,
04344 address,
04345 use_flow_protocol);
04346 }
04347
04348 char *
04349 TAO_FlowProducer::connect_mcast (AVStreams::QoS & ,
04350 CORBA::Boolean_out ,
04351 const char *address,
04352 const char * use_flow_protocol)
04353 {
04354
04355 for (u_int i=0;i<this->protocols_.length ();i++)
04356 {
04357
04358 }
04359
04360 if (address == 0)
04361 if (TAO_debug_level > 0)
04362 ACE_DEBUG ((LM_DEBUG, "TAO_FlowProducer::connect_mcast address is 0\n"));
04363 TAO_Forward_FlowSpec_Entry *entry;
04364 ACE_NEW_RETURN (entry,
04365 TAO_Forward_FlowSpec_Entry(this->flowname_.in (),
04366 "IN",
04367 this->format_.in (),
04368 use_flow_protocol,
04369 address),
04370 0);
04371
04372 this->flow_spec_set_.insert (entry);
04373 TAO_AV_Acceptor_Registry *acceptor_registry =
04374 TAO_AV_CORE::instance ()->acceptor_registry ();
04375 int result = acceptor_registry->open (this,
04376 TAO_AV_CORE::instance (),
04377 this->flow_spec_set_);
04378 if (result < 0)
04379 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowProducer::connect_mcast:acceptor_registry open failed\n"), 0);
04380
04381
04382 ACE_Event_Handler *event_handler = entry->handler ()->event_handler ();
04383 event_handler->reactor ()->remove_handler (event_handler,
04384 ACE_Event_Handler::READ_MASK);
04385 return CORBA::string_dup (address);
04386 }
04387
04388
04389 void
04390 TAO_FlowProducer::set_key (const AVStreams::key & the_key)
04391 {
04392 try
04393 {
04394 CORBA::Any anyval;
04395 anyval <<= the_key;
04396 this->define_property ("PublicKey",
04397 anyval);
04398 }
04399 catch (const CORBA::Exception& ex)
04400 {
04401 ex._tao_print_exception ("TAO_FlowProducer::set_key");
04402 }
04403 }
04404
04405
04406 void
04407 TAO_FlowProducer::set_source_id (CORBA::Long source_id)
04408 {
04409 this->source_id_ = source_id;
04410 }
04411
04412
04413
04414
04415
04416
04417
04418 TAO_FlowConsumer::TAO_FlowConsumer (void)
04419 {
04420 }
04421
04422 TAO_FlowConsumer::TAO_FlowConsumer (const char *flowname,
04423 AVStreams::protocolSpec protocols,
04424 const char *format)
04425 {
04426 this->open (flowname, protocols, format);
04427 }
04428
04429
04430 void
04431 TAO_FlowConsumer::stop (void)
04432 {
04433 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04434 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04435 begin != end; ++begin)
04436 (*begin)->handler ()->stop (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04437 }
04438
04439 void
04440 TAO_FlowConsumer::start (void)
04441 {
04442 TAO_AV_FlowSpecSetItor end = this->flow_spec_set_.end ();
04443 for (TAO_AV_FlowSpecSetItor begin = this->flow_spec_set_.begin ();
04444 begin != end; ++begin)
04445 {
04446 (*begin)->handler ()->start (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
04447 }
04448 }
04449
04450 char *
04451 TAO_FlowConsumer::go_to_listen (AVStreams::QoS & the_qos,
04452 CORBA::Boolean is_mcast,
04453 AVStreams::FlowEndPoint_ptr peer_fep,
04454 char *& flowProtocol)
04455 {
04456 return this->go_to_listen_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04457 the_qos,
04458 is_mcast,
04459 peer_fep,
04460 flowProtocol);
04461 }
04462
04463 CORBA::Boolean
04464 TAO_FlowConsumer::connect_to_peer (AVStreams::QoS & the_qos,
04465 const char * address,
04466 const char * use_flow_protocol)
04467 {
04468 return this->connect_to_peer_i (TAO_FlowSpec_Entry::TAO_AV_CONSUMER,
04469 the_qos,
04470 address,
04471 use_flow_protocol);
04472 }
04473
04474
04475
04476
04477 TAO_Tokenizer::TAO_Tokenizer (const char *string, char delimiter)
04478 :token_array_ (10),
04479 count_ (0)
04480 {
04481 this->parse (string, delimiter);
04482 }
04483
04484 TAO_Tokenizer::~TAO_Tokenizer ()
04485 {
04486 for (unsigned int i=0; i<this->num_tokens_; i++)
04487 CORBA::string_free (this->token_array_[i]);
04488 }
04489
04490
04491 int
04492 TAO_Tokenizer::parse (const char *string, char delimiter)
04493 {
04494 ACE_CString new_string (string);
04495 u_int pos =0;
04496 ACE_CString::size_type slash_pos = 0;
04497 u_int count = 0;
04498 int result;
04499 while (pos < new_string.length ())
04500 {
04501 slash_pos = new_string.find (delimiter, pos);
04502 ACE_CString substring;
04503 if (slash_pos != new_string.npos)
04504 {
04505 substring = new_string.substring (pos,
04506 slash_pos - pos);
04507 pos = slash_pos + 1;
04508 }
04509 else
04510 {
04511 substring = new_string.substring (pos);
04512 pos = static_cast<int> (new_string.length ());
04513 }
04514 char *token = CORBA::string_dup (substring.c_str ());
04515 result = this->token_array_.set (token, count);
04516 if (result == -1)
04517 {
04518 this->token_array_.size (this->token_array_.size ()*2);
04519 result = this->token_array_.set (token, count);
04520 if (result == -1)
04521 ACE_ERROR_RETURN ((LM_ERROR, "TAO_Tokenizer::parse error"), -1);
04522 }
04523 count++;
04524 }
04525
04526
04527
04528
04529
04530
04531
04532
04533
04534
04535
04536
04537
04538
04539
04540
04541
04542
04543
04544
04545
04546 this->num_tokens_ = count;
04547 return 0;
04548 }
04549
04550 char*
04551 TAO_Tokenizer::token (void)
04552 {
04553 if (count_ < num_tokens_)
04554 return CORBA::string_dup (this->token_array_[this->count_++]);
04555 else
04556 return 0;
04557 }
04558
04559 int
04560 TAO_Tokenizer::num_tokens (void)
04561 {
04562 return static_cast<int> (this->num_tokens_);
04563 }
04564
04565 const char *
04566 TAO_Tokenizer::operator [] (size_t index) const
04567 {
04568 if (index >= this->num_tokens_)
04569 return 0;
04570
04571 return this->token_array_[index];
04572 }
04573
04574 TAO_END_VERSIONED_NAMESPACE_DECL