#include <AVStreams_i.h>
Collaboration diagram for TAO_FlowConnection:

Public Member Functions | |
| TAO_FlowConnection (void) | |
| default constructor. | |
| virtual void | stop (void) |
| stop this flow. | |
| virtual void | start (void) |
| start this flow. | |
| virtual void | destroy (void) |
| destroy this flow. | |
| virtual CORBA::Boolean | modify_QoS (AVStreams::QoS &new_qos) |
| modify the QoS for this flow. | |
| virtual CORBA::Boolean | use_flow_protocol (const char *fp_name, const CORBA::Any &fp_settings) |
| use the specified flow protocol for this flow. | |
| virtual void | push_event (const AVStreams::streamEvent &the_event) |
| pushes an event , to be handled by the application. | |
| virtual CORBA::Boolean | connect_devs (AVStreams::FDev_ptr a_party, AVStreams::FDev_ptr b_party, AVStreams::QoS &the_qos) |
| connect 2 Flow Devices. | |
| virtual CORBA::Boolean | connect (AVStreams::FlowProducer_ptr flow_producer, AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
| Connect a flow producer and consumer under this flow connection. | |
| virtual CORBA::Boolean | disconnect (void) |
| disconnect this flow connection. | |
| virtual CORBA::Boolean | add_producer (AVStreams::FlowProducer_ptr flow_producer, AVStreams::QoS &the_qos) |
| adds the producer to this flow connection. | |
| virtual CORBA::Boolean | add_consumer (AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
| adds a consumer to this flow connection. | |
| virtual CORBA::Boolean | drop (AVStreams::FlowEndPoint_ptr target) |
| drops a flow endpoint from the flow. | |
| int | set_mcast_addr (ACE_CString addr, u_short port) |
| void | set_protocol (const char *protocol) |
Protected Types | |
| typedef ACE_Unbounded_Set< AVStreams::FlowProducer_ptr > | FlowProducer_Set |
| typedef ACE_Unbounded_Set_Iterator< AVStreams::FlowProducer_ptr > | FlowProducer_SetItor |
| typedef ACE_Unbounded_Set< AVStreams::FlowConsumer_ptr > | FlowConsumer_Set |
| typedef ACE_Unbounded_Set_Iterator< AVStreams::FlowConsumer_ptr > | FlowConsumer_SetItor |
Protected Attributes | |
| FlowProducer_Set | flow_producer_set_ |
| The multicast address returned by the producer. | |
| FlowConsumer_Set | flow_consumer_set_ |
| CORBA::String_var | fp_name_ |
| CORBA::Any | fp_settings_ |
| CORBA::String_var | producer_address_ |
| int | ip_multicast_ |
| IP Multicasting is used. | |
| TAO_MCastConfigIf * | mcastconfigif_i_ |
| AVStreams::MCastConfigIf_var | mcastconfigif_ |
| u_short | mcast_port_ |
| ACE_CString | mcast_addr_ |
| CORBA::String_var | protocol_ |
Definition at line 801 of file AVStreams_i.h.
|
|
Definition at line 859 of file AVStreams_i.h. |
|
|
Definition at line 860 of file AVStreams_i.h. Referenced by add_consumer(), destroy(), start(), stop(), and use_flow_protocol(). |
|
|
Definition at line 857 of file AVStreams_i.h. |
|
|
Definition at line 858 of file AVStreams_i.h. Referenced by add_consumer(), add_producer(), destroy(), start(), stop(), and use_flow_protocol(). |
|
|
default constructor.
Definition at line 3319 of file AVStreams_i.cpp.
03320 :fp_name_ (CORBA::string_dup ("")), 03321 ip_multicast_ (0) 03322 { 03323 } |
|
||||||||||||
|
adds a consumer to this flow connection.
Definition at line 3674 of file AVStreams_i.cpp. References ACE_ERROR_RETURN, ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, AVStreams::flowSpec, fp_name_, ACE_Unbounded_Set< T >::insert(), ip_multicast_, CORBA::is_nil(), LM_ERROR, LM_WARNING, AVStreams::protocolSpec, AVStreams::streamQoS, and CORBA::string_dup().
03676 {
03677 try
03678 {
03679 AVStreams::FlowConsumer_ptr flow_consumer =
03680 AVStreams::FlowConsumer::_duplicate (consumer);
03681 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
03682 FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
03683 for (; begin != end; ++begin)
03684 {
03685 if ((*begin)->_is_equivalent (consumer))
03686 // Consumer exists in the set, a duplicate.
03687 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
03688 }
03689 int result = this->flow_consumer_set_.insert (flow_consumer);
03690 if (result == 1)
03691 {
03692 // consumer exists in the set, a duplicate.
03693 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
03694 }
03695
03696 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
03697 // @@Lets take that the first entry as the only producer. We're
03698 // not sure if we can have multiple flow producers in a
03699 // flowconnection. We can have multiple producer in the MtM binding,
03700 // in which case the first producer that gets added is the leader.
03701 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);
03702
03703 AVStreams::protocolSpec protocols (1);
03704 protocols.length (1);
03705 protocols [0] = CORBA::string_dup (this->producer_address_.in ());
03706
03707 if (!this->ip_multicast_)
03708 {
03709 flow_consumer->set_protocol_restriction (protocols);
03710 char * address =
03711 flow_consumer->go_to_listen (the_qos,
03712 1,
03713 flow_producer,
03714 this->fp_name_.inout ());
03715 CORBA::Boolean is_met;
03716 flow_producer->connect_mcast (the_qos,
03717 is_met,
03718 address,
03719 this->fp_name_.inout ());
03720 }
03721 else
03722 {
03723 // The spec says go_to_listen is called with the multicast
03724 // address returned from the connect_mcast call called
03725 // during add_producer. But go_to_listen doesn't have a
03726 // address parameter. I guess it should be connect_to_peer.
03727 // IP Multicasting.
03728 flow_consumer->connect_to_peer (the_qos,
03729 this->producer_address_.in (),
03730 this->fp_name_.inout ());
03731
03732 // char * address =
03733 // flow_consumer->go_to_listen (the_qos,
03734 // 1,
03735 // flow_producer,
03736 // this->fp_name_.inout ()
03737 //);
03738
03739 }
03740 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03741 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
03742 // @@ Is this the right place to do set_peer?
03743 AVStreams::flowSpec flow_spec;
03744 AVStreams::streamQoS stream_qos (1);
03745 stream_qos.length (1);
03746 stream_qos [0] = the_qos;
03747 this->mcastconfigif_->set_peer (flow_consumer,
03748 stream_qos,
03749 flow_spec);
03750 }
03751 catch (const CORBA::Exception& ex)
03752 {
03753 ex._tao_print_exception (
03754 "TAO_FlowConnection::add_consumer");
03755 return 0;
03756 }
03757 return 1;
03758 }
|
|
||||||||||||
|
adds the producer to this flow connection.
Definition at line 3585 of file AVStreams_i.cpp. References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_INET_Addr::addr_to_string(), TAO_FlowSpec_Entry::address(), ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_producer_set_, FlowProducer_SetItor, ACE_Unbounded_Set< T >::insert(), ip_multicast_, CORBA::is_nil(), LM_WARNING, mcastconfigif_i_, producer_address_, ACE_INET_Addr::set(), ACE_OS::sprintf(), and ACE_OS::strcpy().
03587 {
03588 try
03589 {
03590 AVStreams::FlowProducer_ptr flow_producer =
03591 AVStreams::FlowProducer::_duplicate (producer);
03592 // @@Naga:Sometimes the same producer could be added with a different object reference.
03593 // There's no portable way of comparing obj refs. but we have to do this till we find
03594 // a permanent solution.For eg. 2 different flowproducers for the same flow in a
03595 // Multipoint to Multipoint binding will have the same flowname and hence cannot be
03596 // used for resolving ties.
03597 FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
03598 FlowProducer_SetItor end = this->flow_producer_set_.end ();
03599 for (; begin != end; ++begin)
03600 {
03601 if ((*begin)->_is_equivalent (producer))
03602 // producer exists in the set, a duplicate.
03603 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03604 }
03605 // We need to check the return value of the insert into the flow producer
03606 // set, since multiconnect could be called many times which will lead to
03607 // a call to add_producer every time a sink is added. If the producer is already
03608 // present in our list we just return immediately.
03609 int result = this->flow_producer_set_.insert (flow_producer);
03610 if (result == 1)
03611 {
03612 // producer exists in the set, a duplicate.
03613 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
03614 }
03615 CORBA::Boolean met_qos;
03616 char mcast_address[BUFSIZ];
03617 if (this->producer_address_.in () == 0)
03618 {
03619 ACE_INET_Addr mcast_addr;
03620 mcast_addr.set (this->mcast_port_,
03621 this->mcast_addr_.c_str ()
03622 );
03623
03624 char buf [BUFSIZ];
03625 mcast_addr.addr_to_string (buf, BUFSIZ);
03626 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
03627 }
03628 else
03629 {
03630 ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
03631 }
03632 char *address = flow_producer->connect_mcast (the_qos,
03633 met_qos,
03634 mcast_address,
03635 this->fp_name_.in ());
03636
03637 if (this->producer_address_.in () == 0)
03638 {
03639 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
03640 if (entry.address () != 0)
03641 {
03642 // Internet multicasting is in use.
03643 this->producer_address_ = address;
03644 }
03645 else
03646 {
03647 // ATM Multicasting is in use.
03648 this->ip_multicast_ = 0;
03649 }
03650 }
03651 // set the multicast peer.
03652 if (CORBA::is_nil (this->mcastconfigif_.in ()))
03653 {
03654 ACE_NEW_RETURN (this->mcastconfigif_i_,
03655 TAO_MCastConfigIf,
03656 0);
03657 this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
03658 }
03659 AVStreams::FlowConnection_var flowconnection = this->_this ();
03660 flow_producer->set_Mcast_peer (flowconnection.in (),
03661 this->mcastconfigif_.in (),
03662 the_qos);
03663 }
03664 catch (const CORBA::Exception& ex)
03665 {
03666 ex._tao_print_exception (
03667 "TAO_FlowConnection::add_producer");
03668 return 0;
03669 }
03670 return 1;
03671 }
|
|
||||||||||||||||
|
Connect a flow producer and consumer under this flow connection.
Definition at line 3518 of file AVStreams_i.cpp. References ACE_DEBUG, flow_consumer_set_, flow_producer_set_, ACE_Unbounded_Set< T >::insert(), LM_DEBUG, ACE_OS::strcmp(), and TAO_debug_level. Referenced by connect_devs().
03521 {
03522 try
03523 {
03524
03525 AVStreams::FlowProducer_ptr flow_producer =
03526 AVStreams::FlowProducer::_duplicate (producer);
03527 AVStreams::FlowConsumer_ptr flow_consumer =
03528 AVStreams::FlowConsumer::_duplicate (consumer);
03529
03530 this->flow_producer_set_.insert (flow_producer);
03531 this->flow_consumer_set_.insert (flow_consumer);
03532 AVStreams::FlowConnection_var flowconnection =
03533 this->_this ();
03534
03535 flow_producer->set_peer (flowconnection.in (),
03536 flow_consumer,
03537 the_qos);
03538
03539 flow_consumer->set_peer (flowconnection.in (),
03540 flow_producer,
03541 the_qos);
03542
03543 char *consumer_address =
03544 flow_consumer->go_to_listen (the_qos,
03545 0, // false for is_mcast
03546 flow_producer,
03547 this->fp_name_.inout ());
03548
03549 if (ACE_OS::strcmp (consumer_address, "") == 0)
03550 {
03551 // Consumer is not willing to listen, so try the producer.
03552 consumer_address = flow_producer->go_to_listen (the_qos,
03553 0, // false for is_mcast
03554 flow_consumer,
03555 this->fp_name_.inout ());
03556 flow_consumer->connect_to_peer (the_qos,
03557 consumer_address,
03558 this->fp_name_.inout ());
03559 // @@ Naga: We have to find means to set the reverse channel for the producer.
03560 // Its broken in the point-to_point case for UDP.
03561 }
03562 else
03563 {
03564 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
03565 flow_producer->connect_to_peer (the_qos,
03566 consumer_address,
03567 this->fp_name_.inout ());
03568 }
03569 }
03570 catch (const CORBA::Exception& ex)
03571 {
03572 ex._tao_print_exception ("TAO_FlowConnection::connect");
03573 }
03574 return 1;
03575 }
|
|
||||||||||||||||
|
connect 2 Flow Devices.
Definition at line 3483 of file AVStreams_i.cpp. References connect().
03486 {
03487 CORBA::Boolean result = 0;
03488 try
03489 {
03490 AVStreams::FlowConnection_var flowconnection = this->_this ();
03491 CORBA::Boolean met_qos;
03492 CORBA::String_var named_fdev ((const char *)"");
03493 AVStreams::FlowProducer_var producer =
03494 a_party->create_producer (flowconnection.in (),
03495 flow_qos,
03496 met_qos,
03497 named_fdev.inout ());
03498 AVStreams::FlowConsumer_var consumer =
03499 b_party->create_consumer (flowconnection.in (),
03500 flow_qos,
03501 met_qos,
03502 named_fdev.inout ());
03503 result = this->connect (producer.in (),
03504 consumer.in (),
03505 flow_qos);
03506 }
03507 catch (const CORBA::Exception& ex)
03508 {
03509 ex._tao_print_exception (
03510 "TAO_FlowConnection::connect_devs");
03511 return 0;
03512 }
03513 return result;
03514 }
|
|
|
destroy this flow.
Definition at line 3409 of file AVStreams_i.cpp. References ACE_DEBUG, ACE_Unbounded_Set< T >::begin(), TAO_AV_Core::deactivate_servant(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, LM_DEBUG, and TAO_debug_level.
03410 {
03411 try
03412 {
03413 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03414 ();
03415 for (FlowProducer_SetItor producer_end =
03416 this->flow_producer_set_.end ();
03417 producer_begin != producer_end; ++producer_begin)
03418 {
03419 (*producer_begin)->destroy ();
03420 }
03421 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03422 ();
03423 for (FlowConsumer_SetItor consumer_end =
03424 this->flow_consumer_set_.end ();
03425 consumer_begin != consumer_end; ++consumer_begin)
03426 {
03427 (*consumer_begin)->destroy ();
03428 }
03429 }
03430 catch (const CORBA::Exception& ex)
03431 {
03432 ex._tao_print_exception ("TAO_FlowConnection::destroy");
03433 return;
03434 }
03435 int result = TAO_AV_Core::deactivate_servant (this);
03436 if (result < 0)
03437 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
03438 }
|
|
|
disconnect this flow connection.
Definition at line 3579 of file AVStreams_i.cpp.
03580 {
03581 return 0;
03582 }
|
|
|
drops a flow endpoint from the flow.
Definition at line 3761 of file AVStreams_i.cpp.
03762 {
03763 ACE_UNUSED_ARG (target);
03764 return 0;
03765 }
|
|
|
modify the QoS for this flow.
Definition at line 3442 of file AVStreams_i.cpp.
03443 {
03444 ACE_UNUSED_ARG (new_qos);
03445 return 0;
03446 }
|
|
|
pushes an event , to be handled by the application.
Definition at line 3477 of file AVStreams_i.cpp. References AVStreams::streamEvent.
03478 {
03479 ACE_UNUSED_ARG (the_event);
03480 }
|
|
||||||||||||
|
Definition at line 3334 of file AVStreams_i.cpp. Referenced by TAO_StreamEndPoint_A::multiconnect().
03335 {
03336 this->mcast_addr_ = mcast_addr;
03337 this->mcast_port_ = mcast_port;
03338 return 0;
03339 }
|
|
|
Definition at line 3342 of file AVStreams_i.cpp. Referenced by TAO_StreamEndPoint_A::multiconnect().
03343 {
03344 this->protocol_ = protocol;
03345 }
|
|
|
start this flow.
Definition at line 3379 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, and FlowProducer_SetItor.
03380 {
03381 try
03382 {
03383 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03384 ();
03385 for (FlowConsumer_SetItor consumer_end =
03386 this->flow_consumer_set_.end ();
03387 consumer_begin != consumer_end; ++consumer_begin)
03388 {
03389 (*consumer_begin)->start ();
03390 }
03391 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03392 ();
03393 for (FlowProducer_SetItor producer_end =
03394 this->flow_producer_set_.end ();
03395 producer_begin != producer_end; ++producer_begin)
03396 {
03397 (*producer_begin)->start ();
03398 }
03399 }
03400 catch (const CORBA::Exception& ex)
03401 {
03402 ex._tao_print_exception ("TAO_FlowConnection::start");
03403 return;
03404 }
03405 }
|
|
|
stop this flow.
Definition at line 3349 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, and FlowProducer_SetItor.
03350 {
03351 try
03352 {
03353 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03354 ();
03355 for (FlowProducer_SetItor producer_end =
03356 this->flow_producer_set_.end ();
03357 producer_begin != producer_end; ++producer_begin)
03358 {
03359 (*producer_begin)->stop ();
03360 }
03361 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03362 ();
03363 for (FlowConsumer_SetItor consumer_end =
03364 this->flow_consumer_set_.end ();
03365 consumer_begin != consumer_end; ++consumer_begin)
03366 {
03367 (*consumer_begin)->stop ();
03368 }
03369 }
03370 catch (const CORBA::Exception& ex)
03371 {
03372 ex._tao_print_exception ("TAO_FlowConnection::stop");
03373 return;
03374 }
03375 }
|
|
||||||||||||
|
use the specified flow protocol for this flow.
Definition at line 3450 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, fp_name_, and fp_settings_.
03452 {
03453 this->fp_name_ = fp_name;
03454 this->fp_settings_ = fp_settings;
03455 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
03456 ();
03457 for (FlowProducer_SetItor producer_end =
03458 this->flow_producer_set_.end ();
03459 producer_begin != producer_end; ++producer_begin)
03460 {
03461 (*producer_begin)->use_flow_protocol
03462 (fp_name, fp_settings);
03463 }
03464 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
03465 ();
03466 for (FlowConsumer_SetItor consumer_end =
03467 this->flow_consumer_set_.end ();
03468 consumer_begin != consumer_end; ++consumer_begin)
03469 {
03470 (*consumer_begin)->use_flow_protocol
03471 (fp_name, fp_settings);
03472 }
03473 return 1;
03474 }
|
|
|
Definition at line 864 of file AVStreams_i.h. Referenced by add_consumer(), connect(), destroy(), start(), stop(), and use_flow_protocol(). |
|
|
The multicast address returned by the producer.
Definition at line 863 of file AVStreams_i.h. Referenced by add_consumer(), add_producer(), connect(), destroy(), start(), stop(), and use_flow_protocol(). |
|
|
Definition at line 865 of file AVStreams_i.h. Referenced by add_consumer(), and use_flow_protocol(). |
|
|
Definition at line 866 of file AVStreams_i.h. Referenced by use_flow_protocol(). |
|
|
IP Multicasting is used.
Definition at line 870 of file AVStreams_i.h. Referenced by add_consumer(), and add_producer(). |
|
|
Definition at line 874 of file AVStreams_i.h. |
|
|
Definition at line 873 of file AVStreams_i.h. |
|
|
Definition at line 872 of file AVStreams_i.h. |
|
|
Definition at line 871 of file AVStreams_i.h. Referenced by add_producer(). |
|
|
Definition at line 867 of file AVStreams_i.h. Referenced by add_producer(). |
|
|
Definition at line 875 of file AVStreams_i.h. |
1.3.6