This class currently supports only one producer and one consumer per flow. More...
#include <AVStreams_i.h>
Public Member Functions | |
TAO_FlowConnection (void) | |
default constructor. | |
virtual void | stop (void) |
stop this flow. | |
virtual void | start (void) |
start this flow. | |
virtual void | destroy (void) |
destroy this flow. | |
virtual CORBA::Boolean | modify_QoS (AVStreams::QoS &new_qos) |
modify the QoS for this flow. | |
virtual CORBA::Boolean | use_flow_protocol (const char *fp_name, const CORBA::Any &fp_settings) |
use the specified flow protocol for this flow. | |
virtual void | push_event (const AVStreams::streamEvent &the_event) |
pushes an event , to be handled by the application. | |
virtual CORBA::Boolean | connect_devs (AVStreams::FDev_ptr a_party, AVStreams::FDev_ptr b_party, AVStreams::QoS &the_qos) |
connect 2 Flow Devices. | |
virtual CORBA::Boolean | connect (AVStreams::FlowProducer_ptr flow_producer, AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
Connect a flow producer and consumer under this flow connection. | |
virtual CORBA::Boolean | disconnect (void) |
disconnect this flow connection. | |
virtual CORBA::Boolean | add_producer (AVStreams::FlowProducer_ptr flow_producer, AVStreams::QoS &the_qos) |
adds the producer to this flow connection. | |
virtual CORBA::Boolean | add_consumer (AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
adds a consumer to this flow connection. | |
virtual CORBA::Boolean | drop (AVStreams::FlowEndPoint_ptr target) |
drops a flow endpoint from the flow. | |
int | set_mcast_addr (ACE_CString addr, u_short port) |
void | set_protocol (const char *protocol) |
Protected Types | |
typedef ACE_Unbounded_Set < AVStreams::FlowProducer_ptr > | FlowProducer_Set |
typedef ACE_Unbounded_Set_Iterator < AVStreams::FlowProducer_ptr > | FlowProducer_SetItor |
typedef ACE_Unbounded_Set < AVStreams::FlowConsumer_ptr > | FlowConsumer_Set |
typedef ACE_Unbounded_Set_Iterator < AVStreams::FlowConsumer_ptr > | FlowConsumer_SetItor |
Protected Attributes | |
FlowProducer_Set | flow_producer_set_ |
The multicast address returned by the producer. | |
FlowConsumer_Set | flow_consumer_set_ |
CORBA::String_var | fp_name_ |
CORBA::Any | fp_settings_ |
CORBA::String_var | producer_address_ |
int | ip_multicast_ |
IP Multicasting is used. | |
TAO_MCastConfigIf * | mcastconfigif_i_ |
AVStreams::MCastConfigIf_var | mcastconfigif_ |
u_short | mcast_port_ |
ACE_CString | mcast_addr_ |
CORBA::String_var | protocol_ |
This class currently supports only one producer and one consumer per flow.
Definition at line 801 of file AVStreams_i.h.
typedef ACE_Unbounded_Set<AVStreams::FlowConsumer_ptr> TAO_FlowConnection::FlowConsumer_Set [protected] |
Definition at line 859 of file AVStreams_i.h.
typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowConsumer_ptr> TAO_FlowConnection::FlowConsumer_SetItor [protected] |
Definition at line 860 of file AVStreams_i.h.
typedef ACE_Unbounded_Set<AVStreams::FlowProducer_ptr> TAO_FlowConnection::FlowProducer_Set [protected] |
Definition at line 857 of file AVStreams_i.h.
typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowProducer_ptr> TAO_FlowConnection::FlowProducer_SetItor [protected] |
Definition at line 858 of file AVStreams_i.h.
TAO_FlowConnection::TAO_FlowConnection | ( | void | ) |
default constructor.
Definition at line 3352 of file AVStreams_i.cpp.
:fp_name_ (CORBA::string_dup ("")), ip_multicast_ (0) { }
CORBA::Boolean TAO_FlowConnection::add_consumer | ( | AVStreams::FlowConsumer_ptr | flow_consumer, | |
AVStreams::QoS & | the_qos | |||
) | [virtual] |
adds a consumer to this flow connection.
Definition at line 3723 of file AVStreams_i.cpp.
{ try { AVStreams::FlowConsumer_ptr flow_consumer = AVStreams::FlowConsumer::_duplicate (consumer); FlowConsumer_SetItor begin = this->flow_consumer_set_.begin (); FlowConsumer_SetItor end = this->flow_consumer_set_.end (); for (; begin != end; ++begin) { if ((*begin)->_is_equivalent (consumer)) // Consumer exists in the set, a duplicate. ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1); } int result = this->flow_consumer_set_.insert (flow_consumer); if (result == 1) { // consumer exists in the set, a duplicate. ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1); } FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); // @@Lets take that the first entry as the only producer. We're // not sure if we can have multiple flow producers in a // flowconnection. We can have multiple producer in the MtM binding, // in which case the first producer that gets added is the leader. AVStreams::FlowProducer_ptr flow_producer = (*producer_begin); AVStreams::protocolSpec protocols (1); protocols.length (1); protocols [0] = CORBA::string_dup (this->producer_address_.in ()); if (!this->ip_multicast_) { flow_consumer->set_protocol_restriction (protocols); char * address = flow_consumer->go_to_listen (the_qos, 1, flow_producer, this->fp_name_.inout ()); CORBA::Boolean is_met; flow_producer->connect_mcast (the_qos, is_met, address, this->fp_name_.inout ()); } else { // The spec says go_to_listen is called with the multicast // address returned from the connect_mcast call called // during add_producer. But go_to_listen doesn't have a // address parameter. I guess it should be connect_to_peer. // IP Multicasting. flow_consumer->connect_to_peer (the_qos, this->producer_address_.in (), this->fp_name_.inout ()); // char * address = // flow_consumer->go_to_listen (the_qos, // 1, // flow_producer, // this->fp_name_.inout () //); } if (CORBA::is_nil (this->mcastconfigif_.in ())) ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0); // @@ Is this the right place to do set_peer? AVStreams::flowSpec flow_spec; AVStreams::streamQoS stream_qos (1); stream_qos.length (1); stream_qos [0] = the_qos; this->mcastconfigif_->set_peer (flow_consumer, stream_qos, flow_spec); } catch (const CORBA::Exception& ex) { ex._tao_print_exception ( "TAO_FlowConnection::add_consumer"); return 0; } return 1; }
CORBA::Boolean TAO_FlowConnection::add_producer | ( | AVStreams::FlowProducer_ptr | flow_producer, | |
AVStreams::QoS & | the_qos | |||
) | [virtual] |
adds the producer to this flow connection.
Definition at line 3634 of file AVStreams_i.cpp.
{ try { AVStreams::FlowProducer_ptr flow_producer = AVStreams::FlowProducer::_duplicate (producer); // @@Naga:Sometimes the same producer could be added with a different object reference. // There's no portable way of comparing obj refs. but we have to do this till we find // a permanent solution.For eg. 2 different flowproducers for the same flow in a // Multipoint to Multipoint binding will have the same flowname and hence cannot be // used for resolving ties. FlowProducer_SetItor begin = this->flow_producer_set_.begin (); FlowProducer_SetItor end = this->flow_producer_set_.end (); for (; begin != end; ++begin) { if ((*begin)->_is_equivalent (producer)) // producer exists in the set, a duplicate. ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1); } // We need to check the return value of the insert into the flow producer // set, since multiconnect could be called many times which will lead to // a call to add_producer every time a sink is added. If the producer is already // present in our list we just return immediately. int result = this->flow_producer_set_.insert (flow_producer); if (result == 1) { // producer exists in the set, a duplicate. ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1); } CORBA::Boolean met_qos; char mcast_address[BUFSIZ]; if (this->producer_address_.in () == 0) { ACE_INET_Addr mcast_addr; mcast_addr.set (this->mcast_port_, this->mcast_addr_.c_str () ); ACE_TCHAR buf [BUFSIZ]; mcast_addr.addr_to_string (buf, BUFSIZ); ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf); } else { ACE_OS::strcpy (mcast_address, this->producer_address_.in ()); } char *address = flow_producer->connect_mcast (the_qos, met_qos, mcast_address, this->fp_name_.in ()); if (this->producer_address_.in () == 0) { TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address); if (entry.address () != 0) { // Internet multicasting is in use. this->producer_address_ = address; } else { // ATM Multicasting is in use. this->ip_multicast_ = 0; } } // set the multicast peer. if (CORBA::is_nil (this->mcastconfigif_.in ())) { ACE_NEW_RETURN (this->mcastconfigif_i_, TAO_MCastConfigIf, 0); this->mcastconfigif_ = this->mcastconfigif_i_->_this (); } AVStreams::FlowConnection_var flowconnection = this->_this (); flow_producer->set_Mcast_peer (flowconnection.in (), this->mcastconfigif_.in (), the_qos); } catch (const CORBA::Exception& ex) { ex._tao_print_exception ( "TAO_FlowConnection::add_producer"); return 0; } return 1; }
CORBA::Boolean TAO_FlowConnection::connect | ( | AVStreams::FlowProducer_ptr | flow_producer, | |
AVStreams::FlowConsumer_ptr | flow_consumer, | |||
AVStreams::QoS & | the_qos | |||
) | [virtual] |
Connect a flow producer and consumer under this flow connection.
Definition at line 3567 of file AVStreams_i.cpp.
{ try { AVStreams::FlowProducer_ptr flow_producer = AVStreams::FlowProducer::_duplicate (producer); AVStreams::FlowConsumer_ptr flow_consumer = AVStreams::FlowConsumer::_duplicate (consumer); this->flow_producer_set_.insert (flow_producer); this->flow_consumer_set_.insert (flow_consumer); AVStreams::FlowConnection_var flowconnection = this->_this (); flow_producer->set_peer (flowconnection.in (), flow_consumer, the_qos); flow_consumer->set_peer (flowconnection.in (), flow_producer, the_qos); char *consumer_address = flow_consumer->go_to_listen (the_qos, 0, // false for is_mcast flow_producer, this->fp_name_.inout ()); if (ACE_OS::strcmp (consumer_address, "") == 0) { // Consumer is not willing to listen, so try the producer. consumer_address = flow_producer->go_to_listen (the_qos, 0, // false for is_mcast flow_consumer, this->fp_name_.inout ()); flow_consumer->connect_to_peer (the_qos, consumer_address, this->fp_name_.inout ()); // @@ Naga: We have to find means to set the reverse channel for the producer. // Its broken in the point-to_point case for UDP. } else { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address)); flow_producer->connect_to_peer (the_qos, consumer_address, this->fp_name_.inout ()); } } catch (const CORBA::Exception& ex) { ex._tao_print_exception ("TAO_FlowConnection::connect"); } return 1; }
CORBA::Boolean TAO_FlowConnection::connect_devs | ( | AVStreams::FDev_ptr | a_party, | |
AVStreams::FDev_ptr | b_party, | |||
AVStreams::QoS & | the_qos | |||
) | [virtual] |
connect 2 Flow Devices.
Definition at line 3532 of file AVStreams_i.cpp.
{ CORBA::Boolean result = 0; try { AVStreams::FlowConnection_var flowconnection = this->_this (); CORBA::Boolean met_qos; CORBA::String_var named_fdev ((const char *)""); AVStreams::FlowProducer_var producer = a_party->create_producer (flowconnection.in (), flow_qos, met_qos, named_fdev.inout ()); AVStreams::FlowConsumer_var consumer = b_party->create_consumer (flowconnection.in (), flow_qos, met_qos, named_fdev.inout ()); result = this->connect (producer.in (), consumer.in (), flow_qos); } catch (const CORBA::Exception& ex) { ex._tao_print_exception ( "TAO_FlowConnection::connect_devs"); return 0; } return result; }
void TAO_FlowConnection::destroy | ( | void | ) | [virtual] |
destroy this flow.
Definition at line 3458 of file AVStreams_i.cpp.
{ try { FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); for (FlowProducer_SetItor producer_end = this->flow_producer_set_.end (); producer_begin != producer_end; ++producer_begin) { (*producer_begin)->destroy (); } FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin (); for (FlowConsumer_SetItor consumer_end = this->flow_consumer_set_.end (); consumer_begin != consumer_end; ++consumer_begin) { (*consumer_begin)->destroy (); } } catch (const CORBA::Exception& ex) { ex._tao_print_exception ("TAO_FlowConnection::destroy"); return; } int result = TAO_AV_Core::deactivate_servant (this); if (result < 0) if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n")); }
CORBA::Boolean TAO_FlowConnection::disconnect | ( | void | ) | [virtual] |
CORBA::Boolean TAO_FlowConnection::drop | ( | AVStreams::FlowEndPoint_ptr | target | ) | [virtual] |
drops a flow endpoint from the flow.
Definition at line 3810 of file AVStreams_i.cpp.
{
ACE_UNUSED_ARG (target);
return 0;
}
CORBA::Boolean TAO_FlowConnection::modify_QoS | ( | AVStreams::QoS & | new_qos | ) | [virtual] |
modify the QoS for this flow.
Definition at line 3491 of file AVStreams_i.cpp.
{
ACE_UNUSED_ARG (new_qos);
return 0;
}
void TAO_FlowConnection::push_event | ( | const AVStreams::streamEvent & | the_event | ) | [virtual] |
pushes an event , to be handled by the application.
Definition at line 3526 of file AVStreams_i.cpp.
{ ACE_UNUSED_ARG (the_event); }
int TAO_FlowConnection::set_mcast_addr | ( | ACE_CString | addr, | |
u_short | port | |||
) |
Definition at line 3367 of file AVStreams_i.cpp.
{ this->mcast_addr_ = mcast_addr; this->mcast_port_ = mcast_port; return 0; }
void TAO_FlowConnection::set_protocol | ( | const char * | protocol | ) |
Definition at line 3375 of file AVStreams_i.cpp.
{ this->protocol_ = protocol; }
void TAO_FlowConnection::start | ( | void | ) | [virtual] |
start this flow.
Definition at line 3420 of file AVStreams_i.cpp.
{ try { FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin (); for (FlowConsumer_SetItor consumer_end = this->flow_consumer_set_.end (); consumer_begin != consumer_end; ++consumer_begin) { (*consumer_begin)->start (); } FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); for (FlowProducer_SetItor producer_end = this->flow_producer_set_.end (); producer_begin != producer_end; ++producer_begin) { (*producer_begin)->start (); } } catch (const AVStreams::noSuchFlow&) { throw; } catch (const CORBA::Exception& ex) { ex._tao_print_exception ("TAO_FlowConnection::start"); throw; } catch(...) { printf ("TAO_FlowConnection::start - unknown exception\n"); } }
void TAO_FlowConnection::stop | ( | void | ) | [virtual] |
stop this flow.
Definition at line 3382 of file AVStreams_i.cpp.
{ try { FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); for (FlowProducer_SetItor producer_end = this->flow_producer_set_.end (); producer_begin != producer_end; ++producer_begin) { (*producer_begin)->stop (); } FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin (); for (FlowConsumer_SetItor consumer_end = this->flow_consumer_set_.end (); consumer_begin != consumer_end; ++consumer_begin) { (*consumer_begin)->stop (); } } catch (const AVStreams::noSuchFlow&) { throw; } catch (const CORBA::Exception& ex) { ex._tao_print_exception ("TAO_FlowConnection::stop"); throw; } catch(...) { printf ("TAO_FlowConnection::stop - unknown exception\n"); } }
CORBA::Boolean TAO_FlowConnection::use_flow_protocol | ( | const char * | fp_name, | |
const CORBA::Any & | fp_settings | |||
) | [virtual] |
use the specified flow protocol for this flow.
Definition at line 3499 of file AVStreams_i.cpp.
{ this->fp_name_ = fp_name; this->fp_settings_ = fp_settings; FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); for (FlowProducer_SetItor producer_end = this->flow_producer_set_.end (); producer_begin != producer_end; ++producer_begin) { (*producer_begin)->use_flow_protocol (fp_name, fp_settings); } FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin (); for (FlowConsumer_SetItor consumer_end = this->flow_consumer_set_.end (); consumer_begin != consumer_end; ++consumer_begin) { (*consumer_begin)->use_flow_protocol (fp_name, fp_settings); } return 1; }
Definition at line 864 of file AVStreams_i.h.
The multicast address returned by the producer.
Definition at line 863 of file AVStreams_i.h.
CORBA::String_var TAO_FlowConnection::fp_name_ [protected] |
Definition at line 865 of file AVStreams_i.h.
CORBA::Any TAO_FlowConnection::fp_settings_ [protected] |
Definition at line 866 of file AVStreams_i.h.
int TAO_FlowConnection::ip_multicast_ [protected] |
IP Multicasting is used.
Definition at line 870 of file AVStreams_i.h.
ACE_CString TAO_FlowConnection::mcast_addr_ [protected] |
Definition at line 874 of file AVStreams_i.h.
u_short TAO_FlowConnection::mcast_port_ [protected] |
Definition at line 873 of file AVStreams_i.h.
AVStreams::MCastConfigIf_var TAO_FlowConnection::mcastconfigif_ [protected] |
Definition at line 872 of file AVStreams_i.h.
TAO_MCastConfigIf* TAO_FlowConnection::mcastconfigif_i_ [protected] |
Definition at line 871 of file AVStreams_i.h.
Definition at line 867 of file AVStreams_i.h.
CORBA::String_var TAO_FlowConnection::protocol_ [protected] |
Definition at line 875 of file AVStreams_i.h.