Public Member Functions | Protected Types | Protected Attributes

TAO_FlowConnection Class Reference

This class currently supports only one producer and one consumer per flow. More...

#include <AVStreams_i.h>

Collaboration diagram for TAO_FlowConnection:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TAO_FlowConnection (void)
 default constructor.
virtual void stop (void)
 stop this flow.
virtual void start (void)
 start this flow.
virtual void destroy (void)
 destroy this flow.
virtual CORBA::Boolean modify_QoS (AVStreams::QoS &new_qos)
 modify the QoS for this flow.
virtual CORBA::Boolean use_flow_protocol (const char *fp_name, const CORBA::Any &fp_settings)
 use the specified flow protocol for this flow.
virtual void push_event (const AVStreams::streamEvent &the_event)
 pushes an event , to be handled by the application.
virtual CORBA::Boolean connect_devs (AVStreams::FDev_ptr a_party, AVStreams::FDev_ptr b_party, AVStreams::QoS &the_qos)
 connect 2 Flow Devices.
virtual CORBA::Boolean connect (AVStreams::FlowProducer_ptr flow_producer, AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos)
 Connect a flow producer and consumer under this flow connection.
virtual CORBA::Boolean disconnect (void)
 disconnect this flow connection.
virtual CORBA::Boolean add_producer (AVStreams::FlowProducer_ptr flow_producer, AVStreams::QoS &the_qos)
 adds the producer to this flow connection.
virtual CORBA::Boolean add_consumer (AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos)
 adds a consumer to this flow connection.
virtual CORBA::Boolean drop (AVStreams::FlowEndPoint_ptr target)
 drops a flow endpoint from the flow.
int set_mcast_addr (ACE_CString addr, u_short port)
void set_protocol (const char *protocol)

Protected Types

typedef ACE_Unbounded_Set
< AVStreams::FlowProducer_ptr > 
FlowProducer_Set
typedef
ACE_Unbounded_Set_Iterator
< AVStreams::FlowProducer_ptr > 
FlowProducer_SetItor
typedef ACE_Unbounded_Set
< AVStreams::FlowConsumer_ptr > 
FlowConsumer_Set
typedef
ACE_Unbounded_Set_Iterator
< AVStreams::FlowConsumer_ptr > 
FlowConsumer_SetItor

Protected Attributes

FlowProducer_Set flow_producer_set_
 The multicast address returned by the producer.
FlowConsumer_Set flow_consumer_set_
CORBA::String_var fp_name_
CORBA::Any fp_settings_
CORBA::String_var producer_address_
int ip_multicast_
 IP Multicasting is used.
TAO_MCastConfigIfmcastconfigif_i_
AVStreams::MCastConfigIf_var mcastconfigif_
u_short mcast_port_
ACE_CString mcast_addr_
CORBA::String_var protocol_

Detailed Description

This class currently supports only one producer and one consumer per flow.

Definition at line 801 of file AVStreams_i.h.


Member Typedef Documentation

typedef ACE_Unbounded_Set<AVStreams::FlowConsumer_ptr> TAO_FlowConnection::FlowConsumer_Set [protected]

Definition at line 859 of file AVStreams_i.h.

typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowConsumer_ptr> TAO_FlowConnection::FlowConsumer_SetItor [protected]

Definition at line 860 of file AVStreams_i.h.

typedef ACE_Unbounded_Set<AVStreams::FlowProducer_ptr> TAO_FlowConnection::FlowProducer_Set [protected]

Definition at line 857 of file AVStreams_i.h.

typedef ACE_Unbounded_Set_Iterator<AVStreams::FlowProducer_ptr> TAO_FlowConnection::FlowProducer_SetItor [protected]

Definition at line 858 of file AVStreams_i.h.


Constructor & Destructor Documentation

TAO_FlowConnection::TAO_FlowConnection ( void   ) 

default constructor.

Definition at line 3352 of file AVStreams_i.cpp.


Member Function Documentation

CORBA::Boolean TAO_FlowConnection::add_consumer ( AVStreams::FlowConsumer_ptr  flow_consumer,
AVStreams::QoS the_qos 
) [virtual]

adds a consumer to this flow connection.

Definition at line 3723 of file AVStreams_i.cpp.

{
  try
    {
      AVStreams::FlowConsumer_ptr flow_consumer =
        AVStreams::FlowConsumer::_duplicate (consumer);
      FlowConsumer_SetItor begin = this->flow_consumer_set_.begin ();
      FlowConsumer_SetItor end = this->flow_consumer_set_.end ();
      for (; begin != end; ++begin)
        {
          if ((*begin)->_is_equivalent (consumer))
            // Consumer exists in the set, a duplicate.
            ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1);
        }
      int result = this->flow_consumer_set_.insert (flow_consumer);
      if (result == 1)
        {
          // consumer exists in the set, a duplicate.
          ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1);
        }

      FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin ();
      // @@Lets take that the first entry as the only producer. We're
      // not sure if we can have multiple flow producers in a
      // flowconnection. We can have multiple producer in the MtM binding,
      // in which case the first producer that gets added is the leader.
      AVStreams::FlowProducer_ptr flow_producer = (*producer_begin);

      AVStreams::protocolSpec protocols (1);
      protocols.length (1);
      protocols [0] = CORBA::string_dup (this->producer_address_.in ());

      if (!this->ip_multicast_)
        {
          flow_consumer->set_protocol_restriction (protocols);
          char * address =
            flow_consumer->go_to_listen (the_qos,
                                         1,
                                         flow_producer,
                                         this->fp_name_.inout ());
          CORBA::Boolean is_met;
          flow_producer->connect_mcast (the_qos,
                                        is_met,
                                        address,
                                        this->fp_name_.inout ());
        }
       else
        {
          // The spec says go_to_listen is called with the multicast
          // address returned from the connect_mcast call called
          // during add_producer. But go_to_listen doesn't have a
          // address parameter. I guess it should be connect_to_peer.
          // IP Multicasting.
          flow_consumer->connect_to_peer (the_qos,
                                          this->producer_address_.in (),
                                          this->fp_name_.inout ());

          //  char * address =
          //             flow_consumer->go_to_listen (the_qos,
          //                                          1,
          //                                          flow_producer,
          //                                          this->fp_name_.inout ()
          //);

        }
      if (CORBA::is_nil (this->mcastconfigif_.in ()))
        ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0);
      // @@ Is this the right place to do set_peer?
      AVStreams::flowSpec flow_spec;
      AVStreams::streamQoS stream_qos (1);
      stream_qos.length (1);
      stream_qos [0] = the_qos;
      this->mcastconfigif_->set_peer (flow_consumer,
                                      stream_qos,
                                      flow_spec);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception (
        "TAO_FlowConnection::add_consumer");
      return 0;
    }
  return 1;
}

CORBA::Boolean TAO_FlowConnection::add_producer ( AVStreams::FlowProducer_ptr  flow_producer,
AVStreams::QoS the_qos 
) [virtual]

adds the producer to this flow connection.

Definition at line 3634 of file AVStreams_i.cpp.

{
  try
    {
          AVStreams::FlowProducer_ptr flow_producer =
            AVStreams::FlowProducer::_duplicate (producer);
          // @@Naga:Sometimes the same producer could be added with a different object reference.
          // There's no portable way of comparing obj refs. but we have to do this till we find
          // a permanent solution.For eg. 2 different flowproducers for the same flow in a
          // Multipoint to Multipoint binding will have the same flowname and hence cannot be
          // used for resolving ties.
          FlowProducer_SetItor begin = this->flow_producer_set_.begin ();
          FlowProducer_SetItor end = this->flow_producer_set_.end ();
          for (; begin != end; ++begin)
            {
              if ((*begin)->_is_equivalent (producer))
              // producer exists in the set, a duplicate.
              ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
            }
          // We need to check the return value of the insert into the flow producer
          // set, since multiconnect could be called many times which will lead to
          // a call to add_producer every time a sink is added. If the producer is already
          // present in our list we just return immediately.
          int result = this->flow_producer_set_.insert (flow_producer);
          if (result == 1)
            {
              // producer exists in the set, a duplicate.
              ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1);
            }
          CORBA::Boolean met_qos;
          char mcast_address[BUFSIZ];
          if (this->producer_address_.in () == 0)
            {
              ACE_INET_Addr mcast_addr;
              mcast_addr.set (this->mcast_port_,
                              this->mcast_addr_.c_str ()
                              );

              ACE_TCHAR buf [BUFSIZ];
              mcast_addr.addr_to_string (buf, BUFSIZ);
              ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf);
            }
          else
            {
              ACE_OS::strcpy (mcast_address, this->producer_address_.in ());
            }
          char *address = flow_producer->connect_mcast (the_qos,
                                                        met_qos,
                                                        mcast_address,
                                                        this->fp_name_.in ());

          if (this->producer_address_.in () == 0)
            {
              TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address);
              if (entry.address () != 0)
                {
                  // Internet multicasting is in use.
                  this->producer_address_ = address;
                }
              else
                {
                  // ATM Multicasting is in use.
                  this->ip_multicast_ = 0;
                }
            }
          // set the multicast peer.
          if (CORBA::is_nil (this->mcastconfigif_.in ()))
            {
              ACE_NEW_RETURN (this->mcastconfigif_i_,
                              TAO_MCastConfigIf,
                              0);
              this->mcastconfigif_ = this->mcastconfigif_i_->_this ();
            }
          AVStreams::FlowConnection_var flowconnection = this->_this ();
          flow_producer->set_Mcast_peer (flowconnection.in (),
                                         this->mcastconfigif_.in (),
                                         the_qos);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception (
        "TAO_FlowConnection::add_producer");
      return 0;
    }
  return 1;
}

CORBA::Boolean TAO_FlowConnection::connect ( AVStreams::FlowProducer_ptr  flow_producer,
AVStreams::FlowConsumer_ptr  flow_consumer,
AVStreams::QoS the_qos 
) [virtual]

Connect a flow producer and consumer under this flow connection.

Definition at line 3567 of file AVStreams_i.cpp.

{
  try
    {

      AVStreams::FlowProducer_ptr flow_producer =
        AVStreams::FlowProducer::_duplicate (producer);
      AVStreams::FlowConsumer_ptr flow_consumer =
        AVStreams::FlowConsumer::_duplicate (consumer);

      this->flow_producer_set_.insert (flow_producer);
      this->flow_consumer_set_.insert (flow_consumer);
      AVStreams::FlowConnection_var flowconnection =
        this->_this ();

      flow_producer->set_peer (flowconnection.in (),
                               flow_consumer,
                               the_qos);

      flow_consumer->set_peer (flowconnection.in (),
                               flow_producer,
                               the_qos);

      char *consumer_address =
        flow_consumer->go_to_listen (the_qos,
                                     0, // false for is_mcast
                                     flow_producer,
                                     this->fp_name_.inout ());

      if (ACE_OS::strcmp (consumer_address, "") == 0)
        {
          // Consumer is not willing to listen, so try the producer.
          consumer_address = flow_producer->go_to_listen (the_qos,
                                                          0, // false for is_mcast
                                                          flow_consumer,
                                                          this->fp_name_.inout ());
          flow_consumer->connect_to_peer (the_qos,
                                          consumer_address,
                                          this->fp_name_.inout ());
          // @@ Naga: We have to find means to set the reverse channel for the producer.
          // Its broken in the point-to_point case for UDP.
        }
      else
        {
          if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address));
          flow_producer->connect_to_peer (the_qos,
                                          consumer_address,
                                          this->fp_name_.inout ());
        }
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_FlowConnection::connect");
    }
  return 1;
}

CORBA::Boolean TAO_FlowConnection::connect_devs ( AVStreams::FDev_ptr  a_party,
AVStreams::FDev_ptr  b_party,
AVStreams::QoS the_qos 
) [virtual]

connect 2 Flow Devices.

Definition at line 3532 of file AVStreams_i.cpp.

{
  CORBA::Boolean result = 0;
  try
    {
      AVStreams::FlowConnection_var flowconnection = this->_this ();
      CORBA::Boolean met_qos;
      CORBA::String_var named_fdev ((const char *)"");
      AVStreams::FlowProducer_var producer =
        a_party->create_producer (flowconnection.in (),
                                  flow_qos,
                                  met_qos,
                                  named_fdev.inout ());
      AVStreams::FlowConsumer_var consumer =
        b_party->create_consumer (flowconnection.in (),
                                  flow_qos,
                                  met_qos,
                                  named_fdev.inout ());
      result = this->connect (producer.in (),
                              consumer.in (),
                              flow_qos);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception (
        "TAO_FlowConnection::connect_devs");
      return 0;
    }
  return result;
}

void TAO_FlowConnection::destroy ( void   )  [virtual]

destroy this flow.

Definition at line 3458 of file AVStreams_i.cpp.

{
  try
    {
      FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
        ();
      for (FlowProducer_SetItor producer_end =
             this->flow_producer_set_.end ();
           producer_begin != producer_end; ++producer_begin)
        {
          (*producer_begin)->destroy ();
        }
      FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
        ();
      for (FlowConsumer_SetItor consumer_end =
             this->flow_consumer_set_.end ();
           consumer_begin != consumer_end; ++consumer_begin)
        {
          (*consumer_begin)->destroy ();
        }
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_FlowConnection::destroy");
      return;
    }
  int result = TAO_AV_Core::deactivate_servant (this);
  if (result < 0)
    if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n"));
}

CORBA::Boolean TAO_FlowConnection::disconnect ( void   )  [virtual]

disconnect this flow connection.

Definition at line 3628 of file AVStreams_i.cpp.

{
  return  0;
}

CORBA::Boolean TAO_FlowConnection::drop ( AVStreams::FlowEndPoint_ptr  target  )  [virtual]

drops a flow endpoint from the flow.

Definition at line 3810 of file AVStreams_i.cpp.

{
  ACE_UNUSED_ARG (target);
  return 0;
}

CORBA::Boolean TAO_FlowConnection::modify_QoS ( AVStreams::QoS new_qos  )  [virtual]

modify the QoS for this flow.

Definition at line 3491 of file AVStreams_i.cpp.

{
  ACE_UNUSED_ARG (new_qos);
  return 0;
}

void TAO_FlowConnection::push_event ( const AVStreams::streamEvent the_event  )  [virtual]

pushes an event , to be handled by the application.

Definition at line 3526 of file AVStreams_i.cpp.

{
  ACE_UNUSED_ARG (the_event);
}

int TAO_FlowConnection::set_mcast_addr ( ACE_CString  addr,
u_short  port 
)

Definition at line 3367 of file AVStreams_i.cpp.

{
  this->mcast_addr_ = mcast_addr;
  this->mcast_port_ = mcast_port;
  return 0;
}

void TAO_FlowConnection::set_protocol ( const char *  protocol  ) 

Definition at line 3375 of file AVStreams_i.cpp.

{
  this->protocol_ = protocol;
}

void TAO_FlowConnection::start ( void   )  [virtual]

start this flow.

Definition at line 3420 of file AVStreams_i.cpp.

{
  try
    {
      FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
        ();
      for (FlowConsumer_SetItor consumer_end =
             this->flow_consumer_set_.end ();
           consumer_begin != consumer_end; ++consumer_begin)
        {
          (*consumer_begin)->start ();
        }
      FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
        ();
      for (FlowProducer_SetItor producer_end =
             this->flow_producer_set_.end ();
           producer_begin != producer_end; ++producer_begin)
        {
          (*producer_begin)->start ();
        }
    }
  catch (const AVStreams::noSuchFlow&)
    {
      throw; 
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_FlowConnection::start");
      throw; 
    }
  catch(...)
    {
      printf ("TAO_FlowConnection::start - unknown exception\n");
    }
}

void TAO_FlowConnection::stop ( void   )  [virtual]

stop this flow.

Definition at line 3382 of file AVStreams_i.cpp.

{
  try
    {
      FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
        ();
      for (FlowProducer_SetItor producer_end =
             this->flow_producer_set_.end ();
           producer_begin != producer_end; ++producer_begin)
        {
          (*producer_begin)->stop ();
        }
      FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
        ();
      for (FlowConsumer_SetItor consumer_end =
             this->flow_consumer_set_.end ();
           consumer_begin != consumer_end; ++consumer_begin)
        {
          (*consumer_begin)->stop ();
        }
    }
  catch (const AVStreams::noSuchFlow&)
    {
      throw;
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_FlowConnection::stop");
      throw;  
    }
  catch(...)
    {
      printf ("TAO_FlowConnection::stop - unknown exception\n");
    }
}

CORBA::Boolean TAO_FlowConnection::use_flow_protocol ( const char *  fp_name,
const CORBA::Any fp_settings 
) [virtual]

use the specified flow protocol for this flow.

Definition at line 3499 of file AVStreams_i.cpp.

{
  this->fp_name_ = fp_name;
  this->fp_settings_ = fp_settings;
  FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin
    ();
  for (FlowProducer_SetItor producer_end =
         this->flow_producer_set_.end ();
       producer_begin != producer_end; ++producer_begin)
    {
      (*producer_begin)->use_flow_protocol
        (fp_name, fp_settings);
    }
  FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin
    ();
  for (FlowConsumer_SetItor consumer_end =
         this->flow_consumer_set_.end ();
       consumer_begin != consumer_end; ++consumer_begin)
    {
      (*consumer_begin)->use_flow_protocol
        (fp_name, fp_settings);
    }
  return 1;
}


Member Data Documentation

Definition at line 864 of file AVStreams_i.h.

The multicast address returned by the producer.

Definition at line 863 of file AVStreams_i.h.

Definition at line 865 of file AVStreams_i.h.

Definition at line 866 of file AVStreams_i.h.

IP Multicasting is used.

Definition at line 870 of file AVStreams_i.h.

Definition at line 874 of file AVStreams_i.h.

u_short TAO_FlowConnection::mcast_port_ [protected]

Definition at line 873 of file AVStreams_i.h.

AVStreams::MCastConfigIf_var TAO_FlowConnection::mcastconfigif_ [protected]

Definition at line 872 of file AVStreams_i.h.

Definition at line 871 of file AVStreams_i.h.

Definition at line 867 of file AVStreams_i.h.

Definition at line 875 of file AVStreams_i.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines