TAO_StreamEndPoint Class Reference

The Stream EndPoint. Used to implement one endpoint of a stream that implements the transport layer. More...

#include <AVStreams_i.h>

Inheritance diagram for TAO_StreamEndPoint:

Inheritance graph
[legend]
Collaboration diagram for TAO_StreamEndPoint:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_StreamEndPoint (void)
 Constructor.

virtual void stop (const AVStreams::flowSpec &the_spec)
 Stop the stream. Empty the_spec means, for all the flows.

virtual void start (const AVStreams::flowSpec &the_spec)
 Start the stream, Empty the_spec means, for all the flows.

virtual void destroy (const AVStreams::flowSpec &the_spec)
 Destroy the stream, Empty the_spec means, for all the flows.

virtual CORBA::Boolean connect (AVStreams::StreamEndPoint_ptr responder, AVStreams::streamQoS &qos_spec, const AVStreams::flowSpec &the_spec)
 Called by StreamCtrl. responder is the peer to connect to.

virtual CORBA::Boolean request_connection (AVStreams::StreamEndPoint_ptr initiator, CORBA::Boolean is_mcast, AVStreams::streamQoS &qos, AVStreams::flowSpec &the_spec)
virtual CORBA::Boolean modify_QoS (AVStreams::streamQoS &new_qos, const AVStreams::flowSpec &the_flows)
 Change the transport qos on a stream.

virtual int change_qos (AVStreams::streamQoS &new_qos, const AVStreams::flowSpec &the_flows)
virtual CORBA::Boolean set_protocol_restriction (const AVStreams::protocolSpec &the_pspec)
 Used to restrict the set of protocols.

virtual void disconnect (const AVStreams::flowSpec &the_spec)
 disconnect the flows

virtual void set_FPStatus (const AVStreams::flowSpec &the_spec, const char *fp_name, const CORBA::Any &fp_settings)
 Used to control the flow.

virtual CORBA::Object_ptr get_fep (const char *flow_name)
 Not implemented in the light profile, throws notsupported.

virtual char * add_fep (CORBA::Object_ptr the_fep)
 Not implemented in the light profile, throws notsupported.

virtual void remove_fep (const char *fep_name)
 Not implemented in the light profile, throws notsupported.

virtual void set_negotiator (AVStreams::Negotiator_ptr new_negotiator)
 Used to "attach" a negotiator to the endpoint.

virtual void set_key (const char *flow_name, const AVStreams::key &the_key)
 Used for public key encryption.

virtual void set_source_id (CORBA::Long source_id)
 Used to set a unique id for packets sent by this streamendpoint.

virtual ~TAO_StreamEndPoint (void)
 Destructor.

CORBA::Boolean multiconnect (AVStreams::streamQoS &the_qos, AVStreams::flowSpec &the_spec)

Protected Member Functions

char * add_fep_i (AVStreams::FlowEndPoint_ptr fep)
 Helper methods to implement add_fep().

char * add_fep_i_add_property (AVStreams::FlowEndPoint_ptr fep)
int translate_qos (const AVStreams::streamQoS &application_qos, AVStreams::streamQoS &network_qos)
 translate from application level to network level qos.


Protected Attributes

u_int flow_count_
u_int flow_num_
 current flow number used for system generation of flow names.

FlowEndPoint_Map fep_map_
 hash table for the flownames and its corresponding flowEndpoint reference.

AVStreams::flowSpec flows_
 sequence of supported flow names.

CORBA::Long source_id_
 source id used for multicast.

AVStreams::Negotiator_var negotiator_
 our local negotiator for QoS.

AVStreams::protocolSpec protocols_
 Our available list of protocols.

CORBA::String_var protocol_
 Chosen protocol for this streamendpoint based on availableprotocols property.

AVStreams::key key_
 Key used for encryption.

u_short mcast_port_
ACE_CString mcast_addr_
ACE_Hash_Map_Manager< ACE_CString,
TAO_FlowSpec_Entry *, ACE_Null_Mutex
mcast_entry_map_
TAO_AV_FlowSpecSet forward_flow_spec_set
TAO_AV_FlowSpecSet reverse_flow_spec_set
AVStreams::StreamEndPoint_var peer_sep_
AVStreams::SFPStatussfp_status_
AVStreams::StreamCtrl_var streamctrl_

Detailed Description

The Stream EndPoint. Used to implement one endpoint of a stream that implements the transport layer.

Definition at line 456 of file AVStreams_i.h.


Constructor & Destructor Documentation

TAO_StreamEndPoint::TAO_StreamEndPoint void   ) 
 

Constructor.

Definition at line 1526 of file AVStreams_i.cpp.

References ACE_DEBUG, ACE_DEFAULT_MULTICAST_ADDR, ACE_DEFAULT_MULTICAST_PORT, LM_DEBUG, mcast_addr_, and TAO_debug_level.

01527   :flow_count_ (0),
01528    flow_num_ (0),
01529    mcast_port_ (ACE_DEFAULT_MULTICAST_PORT+1)
01530 {
01531   //is->mcast_addr_ = ACE_OS::inet_addr (ACE_DEFAULT_MULTICAST_ADDR);
01532   this->mcast_addr_.set (ACE_DEFAULT_MULTICAST_ADDR);
01533   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::TAO_StreamEndPoint::mcast_addr = %s", this->mcast_addr_.c_str ()));
01534   //  this->handle_open ();
01535 }

TAO_StreamEndPoint::~TAO_StreamEndPoint void   )  [virtual]
 

Destructor.

Definition at line 2349 of file AVStreams_i.cpp.

References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), forward_flow_spec_set, reverse_flow_spec_set, and TAO_AV_FlowSpecSetItor.

02350 {
02351   //this->handle_close ();
02352   TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
02353   TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
02354 
02355   int i=0;
02356   // @@ Naga: Will the iterator always give the entries in the order of insertion.
02357   // or is it an implementation fact of ACE containers.
02358   for ( ; begin != end; ++begin, ++i)
02359     {
02360 //       if (i >= FLOWSPEC_MAX)
02361 //         {
02362           TAO_FlowSpec_Entry *entry = *begin;
02363           delete entry;
02364           //        }
02365     }
02366   begin = this->reverse_flow_spec_set.begin ();
02367   end = this->reverse_flow_spec_set.end ();
02368   i = 0;
02369   for (; begin != end; ++begin)
02370     {
02371 //       if (i >= FLOWSPEC_MAX)
02372 //         {
02373           TAO_FlowSpec_Entry *entry = *begin;
02374           delete entry;
02375           //        }
02376     }
02377 }


Member Function Documentation

char * TAO_StreamEndPoint::add_fep CORBA::Object_ptr  the_fep  )  [virtual]
 

Not implemented in the light profile, throws notsupported.

Definition at line 2227 of file AVStreams_i.cpp.

References ACE_CString, add_fep_i(), and fep_map_.

02228 {
02229   AVStreams::FlowEndPoint_var fep =
02230     AVStreams::FlowEndPoint::_narrow (fep_obj);
02231 
02232   CORBA::String_var flow_name =
02233     this->add_fep_i (fep.in ());
02234 
02235   try
02236     {
02237       fep->lock ();
02238       // Add it to the sequence of flowNames supported.
02239       // put the flowname and the flowendpoint in a hashtable.
02240       ACE_CString fep_name_key (CORBA::string_dup (flow_name.in ()));
02241       if (this->fep_map_.bind (fep_name_key, AVStreams::FlowEndPoint::_duplicate (fep.in ())) != 0)
02242         {
02243           throw AVStreams::streamOpFailed ();
02244         }
02245       // increment the flow count.
02246       this->flow_count_++;
02247       this->flows_.length (this->flow_count_);
02248       this->flows_[this->flow_count_-1] = flow_name;
02249       // define/modify the "Flows" property.
02250       CORBA::Any flows_any;
02251       flows_any <<= this->flows_;
02252       this->define_property ("Flows",
02253                              flows_any);
02254     }
02255   catch (const CORBA::Exception& ex)
02256     {
02257       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02258       return 0;
02259     }
02260   return flow_name._retn ();
02261 }

char * TAO_StreamEndPoint::add_fep_i AVStreams::FlowEndPoint_ptr  fep  )  [protected]
 

Helper methods to implement add_fep().

Definition at line 2206 of file AVStreams_i.cpp.

References add_fep_i_add_property(), and CORBA::string_dup().

Referenced by add_fep().

02207 {
02208   CORBA::String_var flow_name;
02209   try
02210     {
02211       CORBA::Any_var flow_name_any =
02212         fep->get_property_value ("FlowName");
02213 
02214       const char *tmp;
02215       flow_name_any >>= tmp;
02216       flow_name = CORBA::string_dup (tmp);
02217     }
02218   catch (const CORBA::Exception&)
02219     {
02220       flow_name =
02221         this->add_fep_i_add_property (fep);
02222     }
02223   return flow_name._retn ();
02224 }

char * TAO_StreamEndPoint::add_fep_i_add_property AVStreams::FlowEndPoint_ptr  fep  )  [protected]
 

Definition at line 2179 of file AVStreams_i.cpp.

References ACE_CString, ACE_OS::sprintf(), and ACE_OS::strdup().

Referenced by add_fep_i().

02180 {
02181   ACE_CString flow_name;
02182 
02183   try
02184     {
02185       // exception implies the flow name is not defined and is system
02186       // generated.
02187       flow_name = "flow";
02188       char tmp[255];
02189       ACE_OS::sprintf (tmp, "%u", this->flow_num_++);
02190       flow_name += tmp;
02191 
02192       CORBA::Any flowname_any;
02193       flowname_any <<= flow_name.c_str ();
02194       fep->define_property ("Flow",
02195                             flowname_any);
02196     }
02197   catch (const CORBA::Exception& ex)
02198     {
02199       ex._tao_print_exception ("TAO_StreamEndPoint::add_fep");
02200       return 0;
02201     }
02202   return ACE_OS::strdup( flow_name.c_str () );
02203 }

int TAO_StreamEndPoint::change_qos AVStreams::streamQoS new_qos,
const AVStreams::flowSpec the_flows
[virtual]
 

Definition at line 2072 of file AVStreams_i.cpp.

References ACE_CString, ACE_DEBUG, ACE_ERROR_RETURN, TAO_AV_Flow_Handler::change_qos(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_LOCK >::find(), Flow_Handler_Map_Entry, TAO_FlowSpec_Entry::flowname(), AVStreams::flowSpec, TAO_AV_QoS::get_flow_qos(), ACE_Hash_Map_Entry< ACE_CString, TAO_AV_Flow_Handler * >::int_id_, LM_DEBUG, LM_ERROR, TAO_Forward_FlowSpec_Entry::parse(), TAO_Base_StreamEndPoint::qos(), AVStreams::streamQoS, and TAO_debug_level.

Referenced by modify_QoS().

02074 {
02075   if (TAO_debug_level > 0)
02076     ACE_DEBUG ((LM_DEBUG,
02077                 "TAO_StreamEndPoint::change_qos\n"));
02078 
02079   TAO_AV_QoS qos (new_qos);
02080   for (int i = 0; (unsigned) i < the_flows.length (); i++)
02081     {
02082       TAO_Forward_FlowSpec_Entry entry;
02083       entry.parse (the_flows [i]);
02084       ACE_CString flow_name_key (entry.flowname ());
02085       Flow_Handler_Map_Entry *handler_entry;
02086       if (this->flow_handler_map_.find (flow_name_key,
02087                                         handler_entry) == 0)
02088         {
02089           AVStreams::QoS flow_qos;
02090           if (qos.get_flow_qos (entry.flowname (), flow_qos) != 0)
02091             ACE_DEBUG ((LM_DEBUG,
02092                         "New QoS for the flow %s is not specified\n",
02093                         entry.flowname ()));
02094           int result;
02095           result = handler_entry->int_id_->change_qos (flow_qos);
02096           if (result != 0)
02097             ACE_ERROR_RETURN ((LM_ERROR,
02098                                "Modifying QoS Failed\n"),
02099                               -1);
02100 
02101         }
02102     }
02103   return 0;
02104 }

CORBA::Boolean TAO_StreamEndPoint::connect AVStreams::StreamEndPoint_ptr  responder,
AVStreams::streamQoS qos_spec,
const AVStreams::flowSpec the_spec
[virtual]
 

Called by StreamCtrl. responder is the peer to connect to.

Definition at line 1539 of file AVStreams_i.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_NEW_RETURN, TAO_Reverse_FlowSpec_Entry::entry_to_string(), TAO_Forward_FlowSpec_Entry::entry_to_string(), AVStreams::flowSpec, forward_flow_spec_set, TAO_Base_StreamEndPoint::handle_postconnect(), TAO_Base_StreamEndPoint::handle_preconnect(), ACE_Unbounded_Set< T >::insert(), ACE_Singleton< TYPE, ACE_LOCK >::instance(), CORBA::is_nil(), LM_DEBUG, LM_ERROR, negotiator_, TAO_Reverse_FlowSpec_Entry::parse(), TAO_Forward_FlowSpec_Entry::parse(), peer_sep_, protocol_, protocols_, AVStreams::protocolSpec, TAO_Base_StreamEndPoint::qos(), reverse_flow_spec_set, TAO_AV_QoS::set(), ACE_OS::strcmp(), AVStreams::streamQoS, CORBA::string_dup(), TAO_debug_level, and translate_qos().

01542 {
01543   if (TAO_debug_level > 0)
01544     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect ()\n"));
01545   CORBA::Boolean retv = 0;
01546   this->peer_sep_ = AVStreams::StreamEndPoint::_duplicate (responder);
01547   try
01548     {
01549       if (!CORBA::is_nil (this->negotiator_.in ()))
01550         {
01551           ACE_DEBUG ((LM_DEBUG,
01552                       "NEGOTIATOR AVIALABLE\n"));
01553 
01554           CORBA::Any_var negotiator_any = responder->get_property_value ("Negotiator");
01555 
01556           AVStreams::Negotiator_ptr peer_negotiator;
01557           negotiator_any.in () >>= peer_negotiator;
01558           if (!CORBA::is_nil (peer_negotiator))
01559             {
01560               CORBA::Boolean result =
01561                 this->negotiator_->negotiate (peer_negotiator,
01562                                               qos);
01563               if (!result)
01564                 if (TAO_debug_level > 0)
01565                   ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect (): negotiate failed\n"));
01566             }
01567         }
01568     }
01569   catch (const CORBA::Exception& ex)
01570     {
01571       ex._tao_print_exception ("TAO_StreamEndPoint::negotiate");
01572     }
01573 
01574   try
01575     {
01576       if (this->protocols_.length () > 0)
01577         {
01578           // choose protocols based on what the remote endpoint can support.
01579           CORBA::Any_var protocols_any =
01580             responder->get_property_value ("AvailableProtocols");
01581           AVStreams::protocolSpec peer_protocols;
01582           AVStreams::protocolSpec *temp_protocols;
01583           protocols_any.in () >>= temp_protocols;
01584           peer_protocols = *temp_protocols;
01585           for (u_int i=0;i<peer_protocols.length ();i++)
01586             {
01587               for (u_int j=0;j<this->protocols_.length ();j++)
01588                 if (ACE_OS::strcmp (peer_protocols [i],
01589                                     this->protocols_[j]) == 0)
01590                   {
01591                     // we'll agree upon the first protocol that matches.
01592                     this->protocol_ = CORBA::string_dup (peer_protocols [i]);
01593                     break;
01594                   }
01595             }
01596         }
01597     }
01598   catch (const CORBA::Exception&)
01599     {
01600       if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "Availableprotocols property not defined\n"));
01601     }
01602   try
01603     {
01604       AVStreams::streamQoS network_qos;
01605       if (qos.length () > 0)
01606         {
01607           if (TAO_debug_level > 0)
01608             ACE_DEBUG ((LM_DEBUG,
01609                         "QoS is Specified\n"));
01610 
01611           int result = this->translate_qos (qos,
01612                                             network_qos);
01613           if (result != 0)
01614             if (TAO_debug_level > 0)
01615               ACE_DEBUG ((LM_DEBUG,
01616                           "QoS translation failed\n"));
01617 
01618           this->qos ().set (network_qos);
01619         }
01620 
01621 
01622       AVStreams::flowSpec flow_spec (the_spec);
01623       this->handle_preconnect (flow_spec);
01624 
01625       if (TAO_debug_level > 0)
01626         ACE_DEBUG ((LM_DEBUG,
01627                     "TAO_StreamEndPoint::connect: flow_spec_length = %d\n",
01628                     flow_spec.length ()));
01629       u_int i;
01630       for (i=0;i<flow_spec.length ();i++)
01631         {
01632           TAO_Forward_FlowSpec_Entry *entry = 0;
01633           ACE_NEW_RETURN (entry,
01634                           TAO_Forward_FlowSpec_Entry,
01635                           0);
01636 
01637           if (entry->parse (flow_spec[i]) == -1)
01638             return 0;
01639 
01640           if (TAO_debug_level > 0)
01641             ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::Connect: %s\n",  entry->entry_to_string ()));
01642 
01643           this->forward_flow_spec_set.insert (entry);
01644         }
01645 
01646       int result =TAO_AV_CORE::instance ()->init_forward_flows (this,
01647                                                                 this->forward_flow_spec_set,
01648                                                                 TAO_AV_Core::TAO_AV_ENDPOINT_A,
01649                                                                 flow_spec);
01650 
01651 
01652       if (result < 0)
01653         ACE_ERROR_RETURN ((LM_ERROR, "%N:%l TAO_AV_Core::init_forward_flows failed\n"), 0);
01654 
01655 
01656       AVStreams::StreamEndPoint_var streamendpoint = this->_this ();
01657 
01658       retv = responder->request_connection (streamendpoint.in (),
01659                                             0,
01660                                             network_qos,
01661                                             flow_spec);
01662 
01663       if (TAO_debug_level > 0)
01664          ACE_DEBUG ((LM_DEBUG, "%N:%l request_connection returned %d\n", retv));
01665 
01666       if (retv == 0)
01667         return retv;
01668       for (i=0;i<flow_spec.length ();i++)
01669         {
01670           TAO_Reverse_FlowSpec_Entry *entry = 0;
01671           ACE_NEW_RETURN (entry,
01672                           TAO_Reverse_FlowSpec_Entry,
01673                           0);
01674           if (entry->parse (flow_spec[i]) == -1)
01675             ACE_ERROR_RETURN ((LM_ERROR,
01676                                "Reverse_Flow_Spec_Set::parse failed\n"),
01677                               0);
01678 
01679           if (TAO_debug_level > 0)
01680             ACE_DEBUG ((LM_DEBUG,
01681                         "TAO_StreamEndPoint::Connect: Reverse Flow Spec %s\n",
01682                         entry->entry_to_string ()));
01683 
01684           this->reverse_flow_spec_set.insert (entry);
01685         }
01686 
01687       result = TAO_AV_CORE::instance ()->init_reverse_flows (this,
01688                                                              this->forward_flow_spec_set,
01689                                                              this->reverse_flow_spec_set,
01690                                                              TAO_AV_Core::TAO_AV_ENDPOINT_A);
01691       if (result < 0)
01692         ACE_ERROR_RETURN ((LM_ERROR,
01693                            "TAO_AV_Core::init_reverse_flows failed\n"),
01694                           0);
01695 
01696       // Make the upcall to the app
01697       retv = this->handle_postconnect (flow_spec);
01698     }
01699   catch (const CORBA::Exception& ex)
01700     {
01701       ex._tao_print_exception ("TAO_StreamEndPoint::connect");
01702       return 0;
01703     }
01704   return retv;
01705 }

void TAO_StreamEndPoint::destroy const AVStreams::flowSpec the_spec  )  [virtual]
 

Destroy the stream, Empty the_spec means, for all the flows.

Definition at line 1860 of file AVStreams_i.cpp.

References ACE_CString, ACE_DEBUG, ACE_Unbounded_Set< T >::begin(), TAO_AV_Core::deactivate_servant(), TAO_AV_Protocol_Object::destroy(), ACE_Unbounded_Set< T >::end(), TAO_FlowSpec_Entry::flowname(), AVStreams::flowSpec, forward_flow_spec_set, TAO_AV_Core::get_control_flowname(), ACE_Singleton< TYPE, ACE_LOCK >::instance(), CORBA::is_nil(), LM_DEBUG, TAO_FlowSpec_Entry::protocol_object(), reverse_flow_spec_set, TAO_AV_Protocol_Object::stop(), ACE_OS::strcmp(), TAO_AV_FlowSpecSetItor, and TAO_debug_level.

01861 {
01862   CORBA::Any_var vdev_any = this->get_property_value ("Related_VDev");
01863 
01864   AVStreams::VDev_ptr vdev;
01865 
01866   vdev_any.in() >>= vdev;
01867   CORBA::Any_var mc_any = vdev->get_property_value ("Related_MediaCtrl");
01868 
01869   // The Related_MediaCtrl property was inserted as a CORBA::Object, so we
01870   // must extract it as the same type.
01871   CORBA::Object_var obj;
01872   mc_any.in() >>= CORBA::Any::to_object( obj.out() );
01873 
01874   AVStreams::MediaControl_var media_ctrl =
01875           AVStreams::MediaControl::_narrow( obj.in() );
01876 
01877   // deactivate the associated vdev and media ctrl
01878 
01879   if ( !CORBA::is_nil( vdev ) )
01880   {
01881     PortableServer::ServantBase_var vdev_servant =
01882         TAO_AV_CORE::instance()->poa()->reference_to_servant ( vdev );
01883     TAO_AV_Core::deactivate_servant (vdev_servant.in());
01884   }
01885 
01886   if ( !CORBA::is_nil ( media_ctrl.in () ) )
01887   {
01888     PortableServer::ServantBase_var mc_servant =
01889         TAO_AV_CORE::instance()->poa()->reference_to_servant (media_ctrl.in());
01890     TAO_AV_Core::deactivate_servant (mc_servant.in());
01891   }
01892 
01893   int result = TAO_AV_Core::deactivate_servant (this);
01894   if (result < 0)
01895     if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::destroy failed\n"));
01896 
01897   if (flow_spec.length () > 0)
01898     {
01899       for (u_int i=0;i<flow_spec.length ();i++)
01900         {
01901           {
01902             TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01903             for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01904                  begin != end; ++begin)
01905               {
01906                 TAO_FlowSpec_Entry *entry = *begin;
01907                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01908                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01909                   {
01910                     if (entry->protocol_object ())
01911                       {
01912                         entry->protocol_object ()->destroy ();
01913                       }
01914                     break;
01915                   }
01916               }
01917           }
01918           {
01919             TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01920             for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01921                  begin != end; ++begin)
01922               {
01923                 TAO_FlowSpec_Entry *entry = *begin;
01924                 TAO_Tokenizer flow_name (flow_spec [i], '\\');
01925                 if (ACE_OS::strcmp (entry->flowname (), flow_name[0]) == 0)
01926                   {
01927                     if (entry->protocol_object ())
01928                       {
01929                         entry->protocol_object ()->destroy ();
01930                       }
01931                     break;
01932                   }
01933               }
01934           }
01935         }
01936     }
01937   else
01938     {
01939       {
01940         TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01941         for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01942              begin != end; ++begin)
01943           {
01944             TAO_FlowSpec_Entry *entry = *begin;
01945             if (entry->protocol_object ())
01946               {
01947                 entry->protocol_object ()->stop ();
01948 
01949                 ACE_CString control_flowname =
01950                     TAO_AV_Core::get_control_flowname (entry->flowname ());
01951                 TAO_AV_CORE::instance()->remove_acceptor(entry->flowname());
01952                 TAO_AV_CORE::instance()->remove_acceptor(control_flowname.c_str());
01953 
01954                 entry->protocol_object ()->destroy ();
01955               }
01956           }
01957       }
01958       {
01959         TAO_AV_FlowSpecSetItor end = this->reverse_flow_spec_set.end ();
01960         for (TAO_AV_FlowSpecSetItor begin = this->reverse_flow_spec_set.begin ();
01961              begin != end; ++begin)
01962           {
01963             TAO_FlowSpec_Entry *entry = *begin;
01964             if (entry->protocol_object ())
01965               {
01966                 entry->protocol_object ()->stop ();
01967 
01968                 ACE_CString control_flowname =
01969                     TAO_AV_Core::get_control_flowname (entry->flowname ());
01970                 TAO_AV_CORE::instance()->remove_connector(entry->flowname());
01971                 TAO_AV_CORE::instance()->remove_connector(control_flowname.c_str());
01972                 entry->protocol_object ()->destroy ();
01973 
01974               }
01975           }
01976       }
01977     }
01978 
01979   // Make the upcall into the app
01980   //  this->handle_destroy (the_spec);
01981   //
01982 }

void TAO_StreamEndPoint::disconnect const AVStreams::flowSpec the_spec  )  [virtual]
 

disconnect the flows

Definition at line 2149 of file AVStreams_i.cpp.

References AVStreams::flowSpec.

02150 {
02151   ACE_UNUSED_ARG (the_spec);
02152 }

CORBA::Object_ptr TAO_StreamEndPoint::get_fep const char *  flow_name  )  [virtual]
 

Not implemented in the light profile, throws notsupported.

Definition at line 2169 of file AVStreams_i.cpp.

References ACE_CString, and fep_map_.

02170 {
02171   ACE_CString fep_name_key (flow_name);
02172   AVStreams::FlowEndPoint_var fep_entry;
02173   if (this->fep_map_.find (fep_name_key, fep_entry) == 0)
02174     return fep_entry._retn();
02175   return 0;
02176 }

CORBA::Boolean TAO_StreamEndPoint::modify_QoS AVStreams::streamQoS new_qos,
const AVStreams::flowSpec the_flows
[virtual]
 

Change the transport qos on a stream.

Definition at line 2108 of file AVStreams_i.cpp.

References ACE_DEBUG, change_qos(), AVStreams::flowSpec, LM_DEBUG, AVStreams::streamQoS, and TAO_debug_level.

02110 {
02111   if (TAO_debug_level > 0)
02112   ACE_DEBUG ((LM_DEBUG,
02113               "TAO_StreamEndPoint::modify_QoS\n"));
02114 
02115   int result =  this->change_qos (new_qos, the_flows);
02116 
02117   if (result != 0)
02118     return 0;
02119 
02120   return 1;
02121 
02122 }

CORBA::Boolean TAO_StreamEndPoint::multiconnect AVStreams::streamQoS the_qos,
AVStreams::flowSpec the_spec
 

Reimplemented in TAO_StreamEndPoint_A, and TAO_StreamEndPoint_B.

Definition at line 2341 of file AVStreams_i.cpp.

References ACE_DEBUG, AVStreams::flowSpec, LM_DEBUG, AVStreams::streamQoS, and TAO_debug_level.

02343 {
02344   if (TAO_debug_level > 0)
02345     ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::multiconnect\n"));
02346   return 0;
02347 }

void TAO_StreamEndPoint::remove_fep const char *  fep_name  )  [virtual]
 

Not implemented in the light profile, throws notsupported.

Definition at line 2265 of file AVStreams_i.cpp.

References ACE_CString, fep_map_, AVStreams::flowSpec, and ACE_OS::strcmp().

02266 {
02267   try
02268     {
02269       ACE_CString fep_name_key (flow_name);
02270       AVStreams::FlowEndPoint_var fep_entry;
02271       // Remove the fep from the hash table.
02272       if (this->fep_map_.unbind (fep_name_key, fep_entry)!= 0)
02273         throw AVStreams::streamOpFailed ();
02274       // redefine the "Flows" property
02275       AVStreams::flowSpec new_flows (this->flows_.length ());
02276       for (u_int i=0, j=0 ; i <this->flows_.length (); i++)
02277         if (ACE_OS::strcmp (flow_name, this->flows_[i]) != 0)
02278           new_flows[j++] = this->flows_[i];
02279 
02280       CORBA::Any flows;
02281       flows <<= new_flows;
02282       this->flows_ = new_flows;
02283       this->define_property ("Flows",
02284                              flows);
02285     }
02286   catch (const CORBA::Exception& ex)
02287     {
02288       ex._tao_print_exception ("TAO_StreamEndPoint::remove_fep");
02289     }
02290 }

CORBA::Boolean TAO_StreamEndPoint::request_connection AVStreams::StreamEndPoint_ptr  initiator,
CORBA::Boolean  is_mcast,
AVStreams::streamQoS qos,
AVStreams::flowSpec the_spec
[virtual]
 

Called by the peer StreamEndPoint. The flow_spec indicates the flows (which contain transport addresses etc.)

Definition at line 1987 of file AVStreams_i.cpp.

References ACE_DEBUG, ACE_NEW_RETURN, TAO_Forward_FlowSpec_Entry::entry_to_string(), AVStreams::flowSpec, forward_flow_spec_set, TAO_Base_StreamEndPoint::handle_connection_requested(), ACE_Unbounded_Set< T >::insert(), ACE_Singleton< TYPE, ACE_LOCK >::instance(), LM_DEBUG, TAO_Forward_FlowSpec_Entry::parse(), TAO_Base_StreamEndPoint::qos(), TAO_AV_QoS::set(), AVStreams::streamQoS, CORBA::string_dup(), TAO_debug_level, and translate_qos().

01992 {
01993   if (TAO_debug_level > 0)
01994     ACE_DEBUG ((LM_DEBUG,
01995                 "\n(%P|%t) TAO_StreamEndPoint::request_connection called"));
01996 
01997   int result = 0;
01998   try
01999     {
02000       AVStreams::streamQoS network_qos;
02001       if (qos.length () > 0)
02002         {
02003          if (TAO_debug_level > 0)
02004           ACE_DEBUG ((LM_DEBUG,
02005                       "QoS is Specified\n"));
02006 
02007           int result = this->translate_qos (qos, network_qos);
02008           if (result != 0)
02009             if (TAO_debug_level > 0)
02010               ACE_DEBUG ((LM_DEBUG, "QoS translation failed\n"));
02011 
02012           this->qos ().set (network_qos);
02013         }
02014 
02015       if (TAO_debug_level > 0)
02016         ACE_DEBUG ((LM_DEBUG,
02017                     "\n(%P|%t) TAO_StreamEndPoint::request_connection: "
02018                     "flowspec has length = %d and the strings are:\n",
02019                     flow_spec.length ()));
02020       CORBA::ULong i;
02021 
02022       for (i=0;i<flow_spec.length ();i++)
02023         {
02024           TAO_Forward_FlowSpec_Entry *entry = 0;
02025           ACE_NEW_RETURN (entry,
02026                           TAO_Forward_FlowSpec_Entry,
02027                           0);
02028 
02029           CORBA::String_var string_entry = CORBA::string_dup (flow_spec[i]);
02030 
02031           if(TAO_debug_level > 0)
02032              ACE_DEBUG(( LM_DEBUG,
02033                          "%N:%l Parsing flow spec: [%s]\n",
02034                          string_entry.in ()));
02035 
02036           if (entry->parse (string_entry.in ()) == -1)
02037           {
02038             if (TAO_debug_level > 0)
02039               ACE_DEBUG ((LM_DEBUG,
02040                           "%N:%l Error parsing flow_spec: [%s]\n",
02041                           string_entry.in ()));
02042             return 0;
02043           }
02044           if (TAO_debug_level > 0)
02045             ACE_DEBUG ((LM_DEBUG,
02046                         "TAO_StreamEndPoint::request_connection flow spec [%s]\n",
02047                         entry->entry_to_string ()));
02048 
02049           this->forward_flow_spec_set.insert (entry);
02050         }
02051 
02052       result = TAO_AV_CORE::instance ()->init_forward_flows (this,
02053                                                              this->forward_flow_spec_set,
02054                                                              TAO_AV_Core::TAO_AV_ENDPOINT_B,
02055                                                              flow_spec);
02056 
02057       if (result < 0)
02058         return 0;
02059 
02060       // Make the upcall to the app
02061       result = this->handle_connection_requested (flow_spec);
02062     }
02063   catch (const CORBA::Exception& ex)
02064     {
02065       ex._tao_print_exception ("TAO_StreamEndpoint::request_connection");
02066       return 0;
02067     }
02068   return result;
02069 }

void TAO_StreamEndPoint::set_FPStatus const AVStreams::flowSpec the_spec,
const char *  fp_name,
const CORBA::Any &  fp_settings
[virtual]
 

Used to control the flow.

Definition at line 2157 of file AVStreams_i.cpp.

References AVStreams::flowSpec, sfp_status_, and ACE_OS::strcmp().

02160 {
02161   if (ACE_OS::strcmp (fp_name, "SFP1.0") != 0)
02162     return;
02163   fp_settings >>= this->sfp_status_;
02164   // @@Naga: We should call set_FPStatus on all the protocol objects.
02165 }

void TAO_StreamEndPoint::set_key const char *  flow_name,
const AVStreams::key the_key
[virtual]
 

Used for public key encryption.

Definition at line 2314 of file AVStreams_i.cpp.

References AVStreams::key, and ACE_OS::sprintf().

02316 {
02317   try
02318     {
02319       this->key_ = the_key;
02320       CORBA::Any PublicKey;
02321       PublicKey <<= the_key;
02322       char PublicKey_property [BUFSIZ];
02323       ACE_OS::sprintf (PublicKey_property, "%s_PublicKey", flow_name);
02324       this->define_property (PublicKey_property,
02325                              PublicKey);
02326     }
02327   catch (const CORBA::Exception& ex)
02328     {
02329       ex._tao_print_exception ("TAO_StreamEndPoint::set_key");
02330     }
02331 }

void TAO_StreamEndPoint::set_negotiator AVStreams::Negotiator_ptr  new_negotiator  )  [virtual]
 

Used to "attach" a negotiator to the endpoint.

Definition at line 2294 of file AVStreams_i.cpp.

References negotiator_.

02295 {
02296   try
02297     {
02298       CORBA::Any negotiator;
02299       negotiator <<= new_negotiator;
02300       this->define_property ("Negotiator",
02301                              negotiator);
02302       this->negotiator_ = AVStreams::Negotiator::_duplicate (new_negotiator);
02303     }
02304   catch (const CORBA::Exception& ex)
02305     {
02306       ex._tao_print_exception (
02307         "TAO_StreamEndPoint::set_negotiator");
02308     }
02309 }

CORBA::Boolean TAO_StreamEndPoint::set_protocol_restriction const AVStreams::protocolSpec the_pspec  )  [virtual]
 

Used to restrict the set of protocols.

Definition at line 2127 of file AVStreams_i.cpp.

References protocols_, and AVStreams::protocolSpec.

02128 {
02129   try
02130     {
02131       CORBA::Any protocol_restriction_any;
02132 
02133       protocol_restriction_any <<= protocols;
02134       this->define_property ("ProtocolRestriction",
02135                              protocol_restriction_any);
02136       this->protocols_ = protocols;
02137     }
02138   catch (const CORBA::Exception& ex)
02139     {
02140       ex._tao_print_exception (
02141         "TAO_StreamEndPoint::set_protocol_restriction");
02142       return 0;
02143     }
02144   return 1;
02145 }

void TAO_StreamEndPoint::set_source_id CORBA::Long  source_id  )  [virtual]
 

Used to set a unique id for packets sent by this streamendpoint.

Definition at line 2335 of file AVStreams_i.cpp.

02336 {
02337   this->source_id_ = source_id;
02338 }

void TAO_StreamEndPoint::start const AVStreams::flowSpec the_spec  )  [virtual]
 

Start the stream, Empty the_spec means, for all the flows.

Definition at line 1773 of file AVStreams_i.cpp.

References ACE_DEBUG, ACE_Unbounded_Set< T >::begin(), TAO_FlowSpec_Entry::control_handler(), ACE_Unbounded_Set< T >::end(), TAO_FlowSpec_Entry::flowname(), AVStreams::flowSpec, forward_flow_spec_set, TAO_Base_StreamEndPoint::handle_start(), TAO_FlowSpec_Entry::handler(), LM_DEBUG, reverse_flow_spec_set, TAO_FlowSpec_Entry::role(), TAO_AV_Flow_Handler::start(), ACE_OS::strcmp(), TAO_AV_FlowSpecSetItor, and TAO_debug_level.

01774 {
01775   if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_StreamEndPoint::start\n"));
01776   // Make the upcall into the app
01777   this->handle_start (flow_spec);
01778 
01779   if (flow_spec.length () > 0)
01780     {
01781       // Now call start on all the flow handlers.
01782       for (u_int i=0;i<flow_spec.length ();i++)
01783         {
01784           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01785           for (TAO_AV_FlowSpecSetItor forward_begin = this->forward_flow_spec_set.begin ();
01786                forward_begin != end; ++forward_begin)
01787             {
01788               TAO_FlowSpec_Entry *entry = *forward_begin;
01789               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01790                 {
01791                   //                  entry->protocol_object ()->start ();
01792                   if (entry->handler () != 0)
01793                   {
01794                     entry->handler ()->start (entry->role ());
01795                   }
01796                   if (entry->control_handler () != 0)
01797                   {
01798                     entry->control_handler ()->start (entry->role ());
01799                   }
01800                 }
01801             }
01802 
01803           end = this->reverse_flow_spec_set.end ();
01804           for (TAO_AV_FlowSpecSetItor reverse_begin = this->reverse_flow_spec_set.begin ();
01805                reverse_begin != end; ++reverse_begin)
01806             {
01807               TAO_FlowSpec_Entry *entry = *reverse_begin;
01808               if (ACE_OS::strcmp (entry->flowname (), flow_spec [i]) == 0)
01809                 {
01810                   //                  entry->protocol_object ()->start ();
01811                   if (entry->handler () != 0)
01812                   {
01813                     entry->handler ()->start (entry->role ());
01814                   }
01815                   if (entry->control_handler () != 0)
01816                   {
01817                     entry->control_handler ()->start (entry->role ());
01818                   }
01819                 }
01820             }
01821         }
01822     }
01823   else
01824     {
01825       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01826       for (TAO_AV_FlowSpecSetItor forwardbegin = this->forward_flow_spec_set.begin ();
01827            forwardbegin != end; ++forwardbegin)
01828         {
01829           TAO_FlowSpec_Entry *entry = *forwardbegin;
01830           if (entry->handler () != 0)
01831             {
01832               entry->handler ()->start (entry->role ());
01833             }
01834           if (entry->control_handler () != 0)
01835             {
01836               entry->control_handler ()->start (entry->role ());
01837             }
01838         }
01839 
01840       end = this->reverse_flow_spec_set.end ();
01841       for (TAO_AV_FlowSpecSetItor reversebegin = this->reverse_flow_spec_set.begin ();
01842            reversebegin != end; ++reversebegin)
01843         {
01844           TAO_FlowSpec_Entry *entry = *reversebegin;
01845           //          entry->protocol_object ()->start ();
01846           if (entry->handler () != 0)
01847             {
01848               entry->handler ()->start (entry->role ());
01849             }
01850           if (entry->control_handler () != 0)
01851             {
01852               entry->control_handler ()->start (entry->role ());
01853             }
01854         }
01855     }
01856 }

void TAO_StreamEndPoint::stop const AVStreams::flowSpec the_spec  )  [virtual]
 

Stop the stream. Empty the_spec means, for all the flows.

Definition at line 1725 of file AVStreams_i.cpp.

References ACE_Unbounded_Set< T >::begin(), TAO_FlowSpec_Entry::control_handler(), ACE_Unbounded_Set< T >::end(), TAO_FlowSpec_Entry::flowname(), AVStreams::flowSpec, forward_flow_spec_set, TAO_Base_StreamEndPoint::handle_stop(), TAO_FlowSpec_Entry::handler(), TAO_Forward_FlowSpec_Entry::parse(), TAO_FlowSpec_Entry::role(), TAO_AV_Flow_Handler::stop(), ACE_OS::strcmp(), and TAO_AV_FlowSpecSetItor.

01726 {
01727   // Make the upcall into the app
01728   this->handle_stop (flow_spec);
01729 
01730   if (flow_spec.length () > 0)
01731     {
01732 
01733       for (u_int i=0;i<flow_spec.length ();i++)
01734         {
01735           TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01736           for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01737                begin != end; ++begin)
01738             {
01739               TAO_Forward_FlowSpec_Entry entry;
01740               entry.parse (flow_spec[i]);
01741               if (ACE_OS::strcmp ((*begin)->flowname (), entry.flowname ()) == 0)
01742                {
01743                  TAO_FlowSpec_Entry *entry = *begin;
01744                  //                  (*begin)->protocol_object ()->stop ();
01745                  if (entry->handler() != 0)
01746                    entry->handler ()->stop (entry->role ());
01747                  if (entry->control_handler () != 0)
01748                    entry->control_handler ()->stop (entry->role ());
01749                  break;
01750                }
01751             }
01752         }
01753     }
01754   else
01755     {
01756       TAO_AV_FlowSpecSetItor end = this->forward_flow_spec_set.end ();
01757       for (TAO_AV_FlowSpecSetItor begin = this->forward_flow_spec_set.begin ();
01758            begin != end; ++begin)
01759         {
01760           TAO_FlowSpec_Entry *entry = *begin;
01761           //          entry->protocol_object ()->stop ();
01762           if (entry->handler() != 0)
01763             entry->handler ()->stop (entry->role ());
01764           if (entry->control_handler () != 0)
01765             entry->control_handler ()->stop (entry->role ());
01766         }
01767     }
01768 }

int TAO_StreamEndPoint::translate_qos const AVStreams::streamQoS application_qos,
AVStreams::streamQoS network_qos
[protected]
 

translate from application level to network level qos.

Definition at line 1708 of file AVStreams_i.cpp.

References AVStreams::streamQoS.

Referenced by connect(), and request_connection().

01710 {
01711   u_int len = application_qos.length ();
01712   network_qos.length (len);
01713   for (u_int i=0;i<len;i++)
01714     {
01715       network_qos [i].QoSType = application_qos [i].QoSType;
01716       network_qos [i].QoSParams = application_qos [i].QoSParams;
01717     }
01718   return 0;
01719 }


Member Data Documentation

FlowEndPoint_Map TAO_StreamEndPoint::fep_map_ [protected]
 

hash table for the flownames and its corresponding flowEndpoint reference.

Definition at line 546 of file AVStreams_i.h.

Referenced by add_fep(), get_fep(), and remove_fep().

u_int TAO_StreamEndPoint::flow_count_ [protected]
 

Count of the number of flows in this streamendpoint, used to generate unique names for the flows.

Definition at line 540 of file AVStreams_i.h.

u_int TAO_StreamEndPoint::flow_num_ [protected]
 

current flow number used for system generation of flow names.

Definition at line 543 of file AVStreams_i.h.

AVStreams::flowSpec TAO_StreamEndPoint::flows_ [protected]
 

sequence of supported flow names.

Definition at line 549 of file AVStreams_i.h.

TAO_AV_FlowSpecSet TAO_StreamEndPoint::forward_flow_spec_set [protected]
 

Definition at line 571 of file AVStreams_i.h.

Referenced by connect(), destroy(), request_connection(), start(), stop(), and ~TAO_StreamEndPoint().

AVStreams::key TAO_StreamEndPoint::key_ [protected]
 

Key used for encryption.

Definition at line 564 of file AVStreams_i.h.

ACE_CString TAO_StreamEndPoint::mcast_addr_ [protected]
 

Definition at line 569 of file AVStreams_i.h.

Referenced by TAO_StreamEndPoint().

ACE_Hash_Map_Manager<ACE_CString, TAO_FlowSpec_Entry*,ACE_Null_Mutex> TAO_StreamEndPoint::mcast_entry_map_ [protected]
 

Definition at line 570 of file AVStreams_i.h.

u_short TAO_StreamEndPoint::mcast_port_ [protected]
 

TAO_Forward_FlowSpec_Entry forward_entries_ [FLOWSPEC_MAX]; TAO_Reverse_FlowSpec_Entry reverse_entries_ [FLOWSPEC_MAX];

Definition at line 568 of file AVStreams_i.h.

AVStreams::Negotiator_var TAO_StreamEndPoint::negotiator_ [protected]
 

our local negotiator for QoS.

Definition at line 555 of file AVStreams_i.h.

Referenced by connect(), and set_negotiator().

AVStreams::StreamEndPoint_var TAO_StreamEndPoint::peer_sep_ [protected]
 

Definition at line 573 of file AVStreams_i.h.

Referenced by connect().

CORBA::String_var TAO_StreamEndPoint::protocol_ [protected]
 

Chosen protocol for this streamendpoint based on availableprotocols property.

Definition at line 561 of file AVStreams_i.h.

Referenced by connect().

AVStreams::protocolSpec TAO_StreamEndPoint::protocols_ [protected]
 

Our available list of protocols.

Definition at line 558 of file AVStreams_i.h.

Referenced by connect(), and set_protocol_restriction().

TAO_AV_FlowSpecSet TAO_StreamEndPoint::reverse_flow_spec_set [protected]
 

Definition at line 572 of file AVStreams_i.h.

Referenced by connect(), destroy(), start(), and ~TAO_StreamEndPoint().

AVStreams::SFPStatus* TAO_StreamEndPoint::sfp_status_ [protected]
 

Definition at line 574 of file AVStreams_i.h.

Referenced by set_FPStatus().

CORBA::Long TAO_StreamEndPoint::source_id_ [protected]
 

source id used for multicast.

Definition at line 552 of file AVStreams_i.h.

AVStreams::StreamCtrl_var TAO_StreamEndPoint::streamctrl_ [protected]
 

Definition at line 575 of file AVStreams_i.h.


The documentation for this class was generated from the following files:
Generated on Sun Jan 27 16:08:13 2008 for TAO_AV by doxygen 1.3.6