Public Member Functions | Protected Member Functions | Protected Attributes

TAO_StreamEndPoint Class Reference

The Stream EndPoint. Used to implement one endpoint of a stream that implements the transport layer. More...

#include <AVStreams_i.h>

Inheritance diagram for TAO_StreamEndPoint:
Inheritance graph
[legend]
Collaboration diagram for TAO_StreamEndPoint:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TAO_StreamEndPoint (void)
 Constructor.
virtual void stop (const AVStreams::flowSpec &the_spec)
 Stop the stream. Empty the_spec means, for all the flows.
virtual void start (const AVStreams::flowSpec &the_spec)
 Start the stream, Empty the_spec means, for all the flows.
virtual void destroy (const AVStreams::flowSpec &the_spec)
 Destroy the stream, Empty the_spec means, for all the flows.
virtual CORBA::Boolean connect (AVStreams::StreamEndPoint_ptr responder, AVStreams::streamQoS &qos_spec, const AVStreams::flowSpec &the_spec)
 Called by StreamCtrl. responder is the peer to connect to.
virtual CORBA::Boolean request_connection (AVStreams::StreamEndPoint_ptr initiator, CORBA::Boolean is_mcast, AVStreams::streamQoS &qos, AVStreams::flowSpec &the_spec)
virtual CORBA::Boolean modify_QoS (AVStreams::streamQoS &new_qos, const AVStreams::flowSpec &the_flows)
 Change the transport qos on a stream.
virtual int change_qos (AVStreams::streamQoS &new_qos, const AVStreams::flowSpec &the_flows)
virtual CORBA::Boolean set_protocol_restriction (const AVStreams::protocolSpec &the_pspec)
 Used to restrict the set of protocols.
virtual void disconnect (const AVStreams::flowSpec &the_spec)
 disconnect the flows
virtual void set_FPStatus (const AVStreams::flowSpec &the_spec, const char *fp_name, const CORBA::Any &fp_settings)
 Used to control the flow.
virtual CORBA::Object_ptr get_fep (const char *flow_name)
 Not implemented in the light profile, throws notsupported.
virtual char * add_fep (CORBA::Object_ptr the_fep)
 Not implemented in the light profile, throws notsupported.
virtual void remove_fep (const char *fep_name)
 Not implemented in the light profile, throws notsupported.
virtual void set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
 Used to "attach" a negotiator to the endpoint.
virtual void set_key (const char *flow_name, const AVStreams::key &the_key)
 Used for public key encryption.
virtual void set_source_id (CORBA::Long source_id)
 Used to set a unique id for packets sent by this streamendpoint.
virtual ~TAO_StreamEndPoint (void)
 Destructor.
CORBA::Boolean multiconnect (AVStreams::streamQoS &the_qos, AVStreams::flowSpec &the_spec)

Protected Member Functions

char * add_fep_i (AVStreams::FlowEndPoint_ptr fep)
 Helper methods to implement add_fep().
char * add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
int translate_qos (const AVStreams::streamQoS &application_qos, AVStreams::streamQoS &network_qos)
 translate from application level to network level qos.

Protected Attributes

u_int flow_count_
u_int flow_num_
 current flow number used for system generation of flow names.
FlowEndPoint_Map fep_map_
 hash table for the flownames and its corresponding flowEndpoint reference.
AVStreams::flowSpec flows_
 sequence of supported flow names.
CORBA::Long source_id_
 source id used for multicast.
AVStreams::Negotiator_var negotiator_
 our local negotiator for QoS.
AVStreams::protocolSpec protocols_
 Our available list of protocols.
CORBA::String_var protocol_
 Chosen protocol for this streamendpoint based on availableprotocols property.
AVStreams::key key_
 Key used for encryption.
u_short mcast_port_
ACE_CString mcast_addr_
ACE_Hash_Map_Manager
< ACE_CString,
TAO_FlowSpec_Entry
*, ACE_Null_Mutex
mcast_entry_map_
TAO_AV_FlowSpecSet forward_flow_spec_set
TAO_AV_FlowSpecSet reverse_flow_spec_set
AVStreams::StreamEndPoint_var peer_sep_
AVStreams::SFPStatussfp_status_
AVStreams::StreamCtrl_var streamctrl_

Detailed Description

The Stream EndPoint. Used to implement one endpoint of a stream that implements the transport layer.

Definition at line 456 of file AVStreams_i.h.


Constructor & Destructor Documentation

TAO_StreamEndPoint::TAO_StreamEndPoint ( void   ) 

Constructor.

Definition at line 1558 of file AVStreams_i.cpp.

  :flow_count_ (0),
   flow_num_ (0),
   mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
{
  //is->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
  this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
  //  this->handle_open ();
}

TAO_StreamEndPoint::~TAO_StreamEndPoint ( void   )  [virtual]

Destructor.

Definition at line 2382 of file AVStreams_i.cpp.

{
  //this->handle_close ();
  TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
  TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();

  int i=0;
  // @@ Naga: Will the iterator always give the entries in the order of insertion.
  // or is it an implementation fact of ACE containers.
  for ( ; begin != end; ++begin, ++i)
    {
//       if (i >= FLOWSPEC_MAX)
//         {
          TAO_FlowSpec_Entry *entry = *begin;
          delete entry;
          //        }
    }
  begin = this->reverse_flow_spec_set.begin ();
  end = this->reverse_flow_spec_set.end ();
  i = 0;
  for (; begin != end; ++begin)
    {
//       if (i >= FLOWSPEC_MAX)
//         {
          TAO_FlowSpec_Entry *entry = *begin;
          delete entry;
          //        }
    }
}


Member Function Documentation

char * TAO_StreamEndPoint::add_fep ( CORBA::Object_ptr  the_fep  )  [virtual]

Not implemented in the light profile, throws notsupported.

Definition at line 2259 of file AVStreams_i.cpp.

{
  AVStreams::FlowEndPoint_var fep =
    AVStreams::FlowEndPoint::_narrow (fep_obj);

  CORBA::String_var flow_name =
    this->add_fep_i (fep.in ());

  try
    {
      fep->lock ();
      // Add it to the sequence of flowNames supported.
      // put the flowname and the flowendpoint in a hashtable.
      ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
      if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
        {
          throw AVStreams::streamOpFailed ();
        }
      // increment the flow count.
      this->flow_count_++;
      this->flows_.length (this->flow_count_);
      this->flows_[this->flow_count_-1] = flow_name;
      // define/modify the "Flows" property.
      CORBA::Any flows_any;
      flows_any <<= this->flows_;
      this->define_property ("Flows",
                             flows_any);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
      return 0;
    }
  return flow_name._retn ();
}

char * TAO_StreamEndPoint::add_fep_i ( AVStreams::FlowEndPoint_ptr  fep  )  [protected]

Helper methods to implement add_fep().

Definition at line 2238 of file AVStreams_i.cpp.

{
  CORBA::String_var flow_name;
  try
    {
      CORBA::Any_var flow_name_any =
        fep->get_property_value ("FlowName");

      const char *tmp = 0;
      flow_name_any >>= tmp;
      flow_name = CORBA::string_dup (tmp);
    }
  catch (const CORBA::Exception&)
    {
      flow_name =
        this->add_fep_i_add_property (fep);
    }
  return flow_name._retn ();
}

char * TAO_StreamEndPoint::add_fep_i_add_property ( AVStreams::FlowEndPoint_ptr  fep  )  [protected]

Definition at line 2211 of file AVStreams_i.cpp.

{
  ACE_CString flow_name;

  try
    {
      // exception implies the flow name is not defined and is system
      // generated.
      flow_name = "flow";
      char tmp[255];
      ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
      flow_name += tmp;

      CORBA::Any flowname_any;
      flowname_any <<= flow_name.c_str ();
      fep->define_property ("Flow",
                            flowname_any);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
      return 0;
    }
  return ACE_OS::strdup( flow_name.c_str () );
}

int TAO_StreamEndPoint::change_qos ( AVStreams::streamQoS new_qos,
const AVStreams::flowSpec the_flows 
) [virtual]

Definition at line 2104 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
                "TAO_StreamEndPoint::change_qos\n"));

  TAO_AV_QoS qos (new_qos);
  for (int i = 0; (unsigned) i < the_flows.length (); i++)
    {
      TAO_Forward_FlowSpec_Entry entry;
      entry.parse (the_flows [i]);
      ACE_CString flow_name_key (entry.flowname ());
      Flow_Handler_Map_Entry *handler_entry;
      if (this->flow_handler_map_.find (flow_name_key,
                                        handler_entry) == 0)
        {
          AVStreams::QoS flow_qos;
          if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
            ACE_DEBUG ((LM_DEBUG,
                        "New QoS for the flow %s is not specified\n",
                        entry.flowname ()));
          int result;
          result = handler_entry->int_id_->change_qos (flow_qos);
          if (result != 0)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "Modifying QoS Failed\n"),
                              -1);

        }
    }
  return 0;
}

CORBA::Boolean TAO_StreamEndPoint::connect ( AVStreams::StreamEndPoint_ptr  responder,
AVStreams::streamQoS qos_spec,
const AVStreams::flowSpec the_spec 
) [virtual]

Called by StreamCtrl. responder is the peer to connect to.

Definition at line 1571 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
  CORBA::Boolean retv = 0;
  this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
  try
    {
      if (!CORBA::is_nil (this->negotiator_.in ()))
        {
          ACE_DEBUG ((LM_DEBUG,
                      "NEGOTIATOR AVIALABLE\n"));

          CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");

          AVStreams::Negotiator_ptr peer_negotiator;
          negotiator_any.in () >>= peer_negotiator;
          if (!CORBA::is_nil (peer_negotiator))
            {
              CORBA::Boolean result =
                this->negotiator_->negotiate (peer_negotiator,
                                              qos);
              if (!result)
                if (TAO_debug_level > 0)
                  ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
            }
        }
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
    }

  try
    {
      if (this->protocols_.length () > 0)
        {
          // choose protocols based on what the remote endpoint can support.
          CORBA::Any_var protocols_any =
            responder->get_property_value ("AvailableProtocols");
          AVStreams::protocolSpec peer_protocols;
          AVStreams::protocolSpec *temp_protocols = 0;
          protocols_any.in () >>= temp_protocols;
          peer_protocols = *temp_protocols;
          for (u_int i=0;i<peer_protocols.length ();i++)
            {
              for (u_int j=0;j<this->protocols_.length ();j++)
                if (ACE_OS::strcmp (peer_protocols [i],
                                    this->protocols_[j]) == 0)
                  {
                    // we'll agree upon the first protocol that matches.
                    this->protocol_ = CORBA::string_dup (peer_protocols [i]);
                    break;
                  }
            }
        }
    }
  catch (const CORBA::Exception&)
    {
      if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
    }
  try
    {
      AVStreams::streamQoS network_qos;
      if (qos.length () > 0)
        {
          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG,
                        "QoS is Specified\n"));

          int result = this->translate_qos (qos,
                                            network_qos);
          if (result != 0)
            if (TAO_debug_level > 0)
              ACE_DEBUG ((LM_DEBUG,
                          "QoS translation failed\n"));

          this->qos ().set (network_qos);
        }


      AVStreams::flowSpec flow_spec (the_spec);
      this->handle_preconnect (flow_spec);

      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
                    flow_spec.length ()));
      u_int i;
      for (i=0;i<flow_spec.length ();i++)
        {
          TAO_Forward_FlowSpec_Entry *entry = 0;
          ACE_NEW_RETURN (entry,
                          TAO_Forward_FlowSpec_Entry,
                          0);

          if (entry->parse (flow_spec[i]) == -1)
            return 0;

          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n",  entry->entry_to_string ()));

          this->forward_flow_spec_set.insert (entry);
        }

      int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
                                                                this->forward_flow_spec_set,
                                                                TAO_AV_Core::TAO_AV_ENDPOINT_A,
                                                                flow_spec);


      if (result < 0)
        ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);


      AVStreams::StreamEndPoint_var streamendpoint = this->_this ();

      retv = responder->request_connection (streamendpoint.in (),
                                            0,
                                            network_qos,
                                            flow_spec);

      if (TAO_debug_level > 0)
         ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));

      if (retv == 0)
        return retv;
      for (i=0;i<flow_spec.length ();i++)
        {
          TAO_Reverse_FlowSpec_Entry *entry = 0;
          ACE_NEW_RETURN (entry,
                          TAO_Reverse_FlowSpec_Entry,
                          0);
          if (entry->parse (flow_spec[i]) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "Reverse_Flow_Spec_Set::parse failed\n"),
                              0);

          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG,
                        "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
                        entry->entry_to_string ()));

          this->reverse_flow_spec_set.insert (entry);
        }

      result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
                                                             this->forward_flow_spec_set,
                                                             this->reverse_flow_spec_set,
                                                             TAO_AV_Core::TAO_AV_ENDPOINT_A);
      if (result < 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "TAO_AV_Core::init_reverse_flows failed\n"),
                          0);

      // Make the upcall to the app
      retv = this->handle_postconnect (flow_spec);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::connect");
      return 0;
    }
  return retv;
}

void TAO_StreamEndPoint::destroy ( const AVStreams::flowSpec the_spec  )  [virtual]

Destroy the stream, Empty the_spec means, for all the flows.

Definition at line 1892 of file AVStreams_i.cpp.

{
  CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");

  AVStreams::VDev_ptr vdev;

  vdev_any.in() >>= vdev;
  CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");

  // The Related_MediaCtrl property was inserted as a CORBA::Object, so we
  // must extract it as the same type.
  CORBA::Object_var obj;
  mc_any.in() >>= CORBA::Any::to_object( obj.out() );

  AVStreams::MediaControl_var media_ctrl =
          AVStreams::MediaControl::_narrow( obj.in() );

  // deactivate the associated vdev and media ctrl

  if ( !CORBA::is_nil( vdev ) )
  {
    PortableServer::ServantBase_var vdev_servant =
        TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
    TAO_AV_Core::deactivate_servant (vdev_servant.in());
  }

  if ( !CORBA::is_nil ( media_ctrl.in () ) )
  {
    PortableServer::ServantBase_var mc_servant =
        TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
    TAO_AV_Core::deactivate_servant (mc_servant.in());
  }

  int result = TAO_AV_Core::deactivate_servant (this);
  if (result < 0)
    if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));

  if (flow_spec.length () > 0)
    {
      for (u_int i=0;i<flow_spec.length ();i++)
        {
          {
            TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
            for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
                 begin != end; ++begin)
              {
                TAO_FlowSpec_Entry *entry = *begin;
                TAO_Tokenizer flow_name (flow_spec [i], '\\');
                if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
                  {
                    if (entry->protocol_object ())
                      {
                        entry->protocol_object ()->destroy ();
                      }
                    break;
                  }
              }
          }
          {
            TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
            for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
                 begin != end; ++begin)
              {
                TAO_FlowSpec_Entry *entry = *begin;
                TAO_Tokenizer flow_name (flow_spec [i], '\\');
                if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
                  {
                    if (entry->protocol_object ())
                      {
                        entry->protocol_object ()->destroy ();
                      }
                    break;
                  }
              }
          }
        }
    }
  else
    {
      {
        TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
        for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
             begin != end; ++begin)
          {
            TAO_FlowSpec_Entry *entry = *begin;
            if (entry->protocol_object ())
              {
                entry->protocol_object ()->stop ();

                ACE_CString control_flowname =
                    TAO_AV_Core::get_control_flowname (entry->flowname ());
                TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
                TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());

                entry->protocol_object ()->destroy ();
              }
          }
      }
      {
        TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
        for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
             begin != end; ++begin)
          {
            TAO_FlowSpec_Entry *entry = *begin;
            if (entry->protocol_object ())
              {
                entry->protocol_object ()->stop ();

                ACE_CString control_flowname =
                    TAO_AV_Core::get_control_flowname (entry->flowname ());
                TAO_AV_CORE::instance()->remove_connector(entry->flowname());
                TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
                entry->protocol_object ()->destroy ();

              }
          }
      }
    }

  // Make the upcall into the app
  //  this->handle_destroy (the_spec);
  //
}

void TAO_StreamEndPoint::disconnect ( const AVStreams::flowSpec the_spec  )  [virtual]

disconnect the flows

Definition at line 2181 of file AVStreams_i.cpp.

{
  ACE_UNUSED_ARG (the_spec);
}

CORBA::Object_ptr TAO_StreamEndPoint::get_fep ( const char *  flow_name  )  [virtual]

Not implemented in the light profile, throws notsupported.

Definition at line 2201 of file AVStreams_i.cpp.

{
  ACE_CString fep_name_key (flow_name);
  AVStreams::FlowEndPoint_var fep_entry;
  if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
    return fep_entry._retn();
  return 0;
}

CORBA::Boolean TAO_StreamEndPoint::modify_QoS ( AVStreams::streamQoS new_qos,
const AVStreams::flowSpec the_flows 
) [virtual]

Change the transport qos on a stream.

Definition at line 2140 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0)
  ACE_DEBUG ((LM_DEBUG,
              "TAO_StreamEndPoint::modify_QoS\n"));

  int result =  this->change_qos (new_qos, the_flows);

  if (result != 0)
    return 0;

  return 1;

}

CORBA::Boolean TAO_StreamEndPoint::multiconnect ( AVStreams::streamQoS the_qos,
AVStreams::flowSpec the_spec 
)

Reimplemented in TAO_StreamEndPoint_A, and TAO_StreamEndPoint_B.

Definition at line 2374 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
  return 0;
}

void TAO_StreamEndPoint::remove_fep ( const char *  fep_name  )  [virtual]

Not implemented in the light profile, throws notsupported.

Definition at line 2297 of file AVStreams_i.cpp.

{
  try
    {
      ACE_CString fep_name_key (flow_name);
      AVStreams::FlowEndPoint_var fep_entry;
      // Remove the fep from the hash table.
      if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
        throw AVStreams::streamOpFailed ();
      // redefine the "Flows" property
      AVStreams::flowSpec new_flows (this->flows_.length ());
      new_flows.length(this->flows_.length ());
      for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
        if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
          new_flows[j++] = this->flows_[i];

      CORBA::Any flows;
      flows <<= new_flows;
      this->flows_ = new_flows;
      this->define_property ("Flows",
                             flows);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
    }
}

CORBA::Boolean TAO_StreamEndPoint::request_connection ( AVStreams::StreamEndPoint_ptr  initiator,
CORBA::Boolean  is_mcast,
AVStreams::streamQoS qos,
AVStreams::flowSpec the_spec 
) [virtual]

Called by the peer StreamEndPoint. The flow_spec indicates the flows (which contain transport addresses etc.)

Definition at line 2019 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
                "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));

  int result = 0;
  try
    {
      AVStreams::streamQoS network_qos;
      if (qos.length () > 0)
        {
         if (TAO_debug_level > 0)
          ACE_DEBUG ((LM_DEBUG,
                      "QoS is Specified\n"));

          int result = this->translate_qos (qos, network_qos);
          if (result != 0)
            if (TAO_debug_level > 0)
              ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));

          this->qos ().set (network_qos);
        }

      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
                    "flowspec has length = %d and the strings are:\n",
                    flow_spec.length ()));
      CORBA::ULong i;

      for (i=0;i<flow_spec.length ();i++)
        {
          TAO_Forward_FlowSpec_Entry *entry = 0;
          ACE_NEW_RETURN (entry,
                          TAO_Forward_FlowSpec_Entry,
                          0);

          CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);

          if(TAO_debug_level > 0)
             ACE_DEBUG(( LM_DEBUG,
                         "%N:%l Parsing flow spec: [%s]\n",
                         string_entry.in ()));

          if (entry->parse (string_entry.in ()) == -1)
          {
            if (TAO_debug_level > 0)
              ACE_DEBUG ((LM_DEBUG,
                          "%N:%l Error parsing flow_spec: [%s]\n",
                          string_entry.in ()));
            return 0;
          }
          if (TAO_debug_level > 0)
            ACE_DEBUG ((LM_DEBUG,
                        "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
                        entry->entry_to_string ()));

          this->forward_flow_spec_set.insert (entry);
        }

      result = TAO_AV_CORE::instance ()->init_forward_flows (this,
                                                             this->forward_flow_spec_set,
                                                             TAO_AV_Core::TAO_AV_ENDPOINT_B,
                                                             flow_spec);

      if (result < 0)
        return 0;

      // Make the upcall to the app
      result = this->handle_connection_requested (flow_spec);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
      return 0;
    }
  return result;
}

void TAO_StreamEndPoint::set_FPStatus ( const AVStreams::flowSpec the_spec,
const char *  fp_name,
const CORBA::Any fp_settings 
) [virtual]

Used to control the flow.

Definition at line 2189 of file AVStreams_i.cpp.

{
  if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
    return;
  fp_settings >>= this->sfp_status_;
  // @@Naga: We should call set_FPStatus on all the protocol objects.
}

void TAO_StreamEndPoint::set_key ( const char *  flow_name,
const AVStreams::key the_key 
) [virtual]

Used for public key encryption.

Definition at line 2347 of file AVStreams_i.cpp.

{
  try
    {
      this->key_ = the_key;
      CORBA::Any PublicKey;
      PublicKey <<= the_key;
      char PublicKey_property [BUFSIZ];
      ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
      this->define_property (PublicKey_property,
                             PublicKey);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
    }
}

void TAO_StreamEndPoint::set_negotiator ( AVStreams::Negotiator_ptr  new_negotiator  )  [virtual]

Used to "attach" a negotiator to the endpoint.

Definition at line 2327 of file AVStreams_i.cpp.

{
  try
    {
      CORBA::Any negotiator;
      negotiator <<= new_negotiator;
      this->define_property ("Negotiator",
                             negotiator);
      this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception (
        "TAO_StreamEndPoint::set_negotiator");
    }
}

CORBA::Boolean TAO_StreamEndPoint::set_protocol_restriction ( const AVStreams::protocolSpec the_pspec  )  [virtual]

Used to restrict the set of protocols.

Definition at line 2159 of file AVStreams_i.cpp.

{
  try
    {
      CORBA::Any protocol_restriction_any;

      protocol_restriction_any <<= protocols;
      this->define_property ("ProtocolRestriction",
                             protocol_restriction_any);
      this->protocols_ = protocols;
    }
  catch (const CORBA::Exception& ex)
    {
      ex._tao_print_exception (
        "TAO_StreamEndPoint::set_protocol_restriction");
      return 0;
    }
  return 1;
}

void TAO_StreamEndPoint::set_source_id ( CORBA::Long  source_id  )  [virtual]

Used to set a unique id for packets sent by this streamendpoint.

Definition at line 2368 of file AVStreams_i.cpp.

{
  this->source_id_ = source_id;
}

void TAO_StreamEndPoint::start ( const AVStreams::flowSpec the_spec  )  [virtual]

Start the stream, Empty the_spec means, for all the flows.

Definition at line 1805 of file AVStreams_i.cpp.

{
  if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
  // Make the upcall into the app
  this->handle_start (flow_spec);

  if (flow_spec.length () > 0)
    {
      // Now call start on all the flow handlers.
      for (u_int i=0;i<flow_spec.length ();i++)
        {
          TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
          for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
               forward_begin != end; ++forward_begin)
            {
              TAO_FlowSpec_Entry *entry = *forward_begin;
              if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
                {
                  //                  entry->protocol_object ()->start ();
                  if (entry->handler () != 0)
                  {
                    entry->handler ()->start (entry->role ());
                  }
                  if (entry->control_handler () != 0)
                  {
                    entry->control_handler ()->start (entry->role ());
                  }
                }
            }

          end = this->reverse_flow_spec_set.end ();
          for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
               reverse_begin != end; ++reverse_begin)
            {
              TAO_FlowSpec_Entry *entry = *reverse_begin;
              if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
                {
                  //                  entry->protocol_object ()->start ();
                  if (entry->handler () != 0)
                  {
                    entry->handler ()->start (entry->role ());
                  }
                  if (entry->control_handler () != 0)
                  {
                    entry->control_handler ()->start (entry->role ());
                  }
                }
            }
        }
    }
  else
    {
      TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
      for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
           forwardbegin != end; ++forwardbegin)
        {
          TAO_FlowSpec_Entry *entry = *forwardbegin;
          if (entry->handler () != 0)
            {
              entry->handler ()->start (entry->role ());
            }
          if (entry->control_handler () != 0)
            {
              entry->control_handler ()->start (entry->role ());
            }
        }

      end = this->reverse_flow_spec_set.end ();
      for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
           reversebegin != end; ++reversebegin)
        {
          TAO_FlowSpec_Entry *entry = *reversebegin;
          //          entry->protocol_object ()->start ();
          if (entry->handler () != 0)
            {
              entry->handler ()->start (entry->role ());
            }
          if (entry->control_handler () != 0)
            {
              entry->control_handler ()->start (entry->role ());
            }
        }
    }
}

void TAO_StreamEndPoint::stop ( const AVStreams::flowSpec the_spec  )  [virtual]

Stop the stream. Empty the_spec means, for all the flows.

Definition at line 1757 of file AVStreams_i.cpp.

{
  // Make the upcall into the app
  this->handle_stop (flow_spec);

  if (flow_spec.length () > 0)
    {

      for (u_int i=0;i<flow_spec.length ();i++)
        {
          TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
          for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
               begin != end; ++begin)
            {
              TAO_Forward_FlowSpec_Entry entry;
              entry.parse (flow_spec[i]);
              if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
               {
                 TAO_FlowSpec_Entry *entry = *begin;
                 //                  (*begin)->protocol_object ()->stop ();
                 if (entry->handler() != 0)
                   entry->handler ()->stop (entry->role ());
                 if (entry->control_handler () != 0)
                   entry->control_handler ()->stop (entry->role ());
                 break;
               }
            }
        }
    }
  else
    {
      TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
      for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
           begin != end; ++begin)
        {
          TAO_FlowSpec_Entry *entry = *begin;
          //          entry->protocol_object ()->stop ();
          if (entry->handler() != 0)
            entry->handler ()->stop (entry->role ());
          if (entry->control_handler () != 0)
            entry->control_handler ()->stop (entry->role ());
        }
    }
}

int TAO_StreamEndPoint::translate_qos ( const AVStreams::streamQoS application_qos,
AVStreams::streamQoS network_qos 
) [protected]

translate from application level to network level qos.

Definition at line 1740 of file AVStreams_i.cpp.

{
  u_int len = application_qos.length ();
  network_qos.length (len);
  for (u_int i=0;i<len;i++)
    {
      network_qos [i].QoSType = application_qos [i].QoSType;
      network_qos [i].QoSParams = application_qos [i].QoSParams;
    }
  return 0;
}


Member Data Documentation

hash table for the flownames and its corresponding flowEndpoint reference.

Definition at line 546 of file AVStreams_i.h.

Count of the number of flows in this streamendpoint, used to generate unique names for the flows.

Definition at line 540 of file AVStreams_i.h.

u_int TAO_StreamEndPoint::flow_num_ [protected]

current flow number used for system generation of flow names.

Definition at line 543 of file AVStreams_i.h.

sequence of supported flow names.

Definition at line 549 of file AVStreams_i.h.

Definition at line 571 of file AVStreams_i.h.

Key used for encryption.

Definition at line 564 of file AVStreams_i.h.

Definition at line 569 of file AVStreams_i.h.

Definition at line 570 of file AVStreams_i.h.

u_short TAO_StreamEndPoint::mcast_port_ [protected]

TAO_Forward_FlowSpec_Entry forward_entries_ [FLOWSPEC_MAX]; TAO_Reverse_FlowSpec_Entry reverse_entries_ [FLOWSPEC_MAX];

Definition at line 568 of file AVStreams_i.h.

AVStreams::Negotiator_var TAO_StreamEndPoint::negotiator_ [protected]

our local negotiator for QoS.

Definition at line 555 of file AVStreams_i.h.

AVStreams::StreamEndPoint_var TAO_StreamEndPoint::peer_sep_ [protected]

Definition at line 573 of file AVStreams_i.h.

Chosen protocol for this streamendpoint based on availableprotocols property.

Definition at line 561 of file AVStreams_i.h.

Our available list of protocols.

Definition at line 558 of file AVStreams_i.h.

Definition at line 572 of file AVStreams_i.h.

Definition at line 574 of file AVStreams_i.h.

source id used for multicast.

Definition at line 552 of file AVStreams_i.h.

AVStreams::StreamCtrl_var TAO_StreamEndPoint::streamctrl_ [protected]

Definition at line 575 of file AVStreams_i.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines