#include <AVStreams_i.h>
Collaboration diagram for TAO_FlowConnection:
Public Member Functions | |
TAO_FlowConnection (void) | |
default constructor. | |
virtual void | stop (void) |
stop this flow. | |
virtual void | start (void) |
start this flow. | |
virtual void | destroy (void) |
destroy this flow. | |
virtual CORBA::Boolean | modify_QoS (AVStreams::QoS &new_qos) |
modify the QoS for this flow. | |
virtual CORBA::Boolean | use_flow_protocol (const char *fp_name, const CORBA::Any &fp_settings) |
use the specified flow protocol for this flow. | |
virtual void | push_event (const AVStreams::streamEvent &the_event) |
pushes an event , to be handled by the application. | |
virtual CORBA::Boolean | connect_devs (AVStreams::FDev_ptr a_party, AVStreams::FDev_ptr b_party, AVStreams::QoS &the_qos) |
connect 2 Flow Devices. | |
virtual CORBA::Boolean | connect (AVStreams::FlowProducer_ptr flow_producer, AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
Connect a flow producer and consumer under this flow connection. | |
virtual CORBA::Boolean | disconnect (void) |
disconnect this flow connection. | |
virtual CORBA::Boolean | add_producer (AVStreams::FlowProducer_ptr flow_producer, AVStreams::QoS &the_qos) |
adds the producer to this flow connection. | |
virtual CORBA::Boolean | add_consumer (AVStreams::FlowConsumer_ptr flow_consumer, AVStreams::QoS &the_qos) |
adds a consumer to this flow connection. | |
virtual CORBA::Boolean | drop (AVStreams::FlowEndPoint_ptr target) |
drops a flow endpoint from the flow. | |
int | set_mcast_addr (ACE_CString addr, u_short port) |
void | set_protocol (const char *protocol) |
Protected Types | |
typedef ACE_Unbounded_Set< AVStreams::FlowProducer_ptr > | FlowProducer_Set |
typedef ACE_Unbounded_Set_Iterator< AVStreams::FlowProducer_ptr > | FlowProducer_SetItor |
typedef ACE_Unbounded_Set< AVStreams::FlowConsumer_ptr > | FlowConsumer_Set |
typedef ACE_Unbounded_Set_Iterator< AVStreams::FlowConsumer_ptr > | FlowConsumer_SetItor |
Protected Attributes | |
FlowProducer_Set | flow_producer_set_ |
The multicast address returned by the producer. | |
FlowConsumer_Set | flow_consumer_set_ |
CORBA::String_var | fp_name_ |
CORBA::Any | fp_settings_ |
CORBA::String_var | producer_address_ |
int | ip_multicast_ |
IP Multicasting is used. | |
TAO_MCastConfigIf * | mcastconfigif_i_ |
AVStreams::MCastConfigIf_var | mcastconfigif_ |
u_short | mcast_port_ |
ACE_CString | mcast_addr_ |
CORBA::String_var | protocol_ |
Definition at line 801 of file AVStreams_i.h.
|
Definition at line 859 of file AVStreams_i.h. |
|
Definition at line 860 of file AVStreams_i.h. Referenced by add_consumer(), destroy(), start(), stop(), and use_flow_protocol(). |
|
Definition at line 857 of file AVStreams_i.h. |
|
Definition at line 858 of file AVStreams_i.h. Referenced by add_consumer(), add_producer(), destroy(), start(), stop(), and use_flow_protocol(). |
|
default constructor.
Definition at line 3319 of file AVStreams_i.cpp.
03320 :fp_name_ (CORBA::string_dup ("")), 03321 ip_multicast_ (0) 03322 { 03323 } |
|
adds a consumer to this flow connection.
Definition at line 3674 of file AVStreams_i.cpp. References ACE_ERROR_RETURN, ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, AVStreams::flowSpec, fp_name_, ACE_Unbounded_Set< T >::insert(), ip_multicast_, CORBA::is_nil(), LM_ERROR, LM_WARNING, AVStreams::protocolSpec, AVStreams::streamQoS, and CORBA::string_dup().
03676 { 03677 try 03678 { 03679 AVStreams::FlowConsumer_ptr flow_consumer = 03680 AVStreams::FlowConsumer::_duplicate (consumer); 03681 FlowConsumer_SetItor begin = this->flow_consumer_set_.begin (); 03682 FlowConsumer_SetItor end = this->flow_consumer_set_.end (); 03683 for (; begin != end; ++begin) 03684 { 03685 if ((*begin)->_is_equivalent (consumer)) 03686 // Consumer exists in the set, a duplicate. 03687 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_Consumer: Consumer already exists\n"), 1); 03688 } 03689 int result = this->flow_consumer_set_.insert (flow_consumer); 03690 if (result == 1) 03691 { 03692 // consumer exists in the set, a duplicate. 03693 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_consumer: consumer already exists\n"), 1); 03694 } 03695 03696 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin (); 03697 // @@Lets take that the first entry as the only producer. We're 03698 // not sure if we can have multiple flow producers in a 03699 // flowconnection. We can have multiple producer in the MtM binding, 03700 // in which case the first producer that gets added is the leader. 03701 AVStreams::FlowProducer_ptr flow_producer = (*producer_begin); 03702 03703 AVStreams::protocolSpec protocols (1); 03704 protocols.length (1); 03705 protocols [0] = CORBA::string_dup (this->producer_address_.in ()); 03706 03707 if (!this->ip_multicast_) 03708 { 03709 flow_consumer->set_protocol_restriction (protocols); 03710 char * address = 03711 flow_consumer->go_to_listen (the_qos, 03712 1, 03713 flow_producer, 03714 this->fp_name_.inout ()); 03715 CORBA::Boolean is_met; 03716 flow_producer->connect_mcast (the_qos, 03717 is_met, 03718 address, 03719 this->fp_name_.inout ()); 03720 } 03721 else 03722 { 03723 // The spec says go_to_listen is called with the multicast 03724 // address returned from the connect_mcast call called 03725 // during add_producer. But go_to_listen doesn't have a 03726 // address parameter. I guess it should be connect_to_peer. 03727 // IP Multicasting. 03728 flow_consumer->connect_to_peer (the_qos, 03729 this->producer_address_.in (), 03730 this->fp_name_.inout ()); 03731 03732 // char * address = 03733 // flow_consumer->go_to_listen (the_qos, 03734 // 1, 03735 // flow_producer, 03736 // this->fp_name_.inout () 03737 //); 03738 03739 } 03740 if (CORBA::is_nil (this->mcastconfigif_.in ())) 03741 ACE_ERROR_RETURN ((LM_ERROR, "TAO_FlowConnection::add_consumer: first add a producer and then a consumer\n"), 0); 03742 // @@ Is this the right place to do set_peer? 03743 AVStreams::flowSpec flow_spec; 03744 AVStreams::streamQoS stream_qos (1); 03745 stream_qos.length (1); 03746 stream_qos [0] = the_qos; 03747 this->mcastconfigif_->set_peer (flow_consumer, 03748 stream_qos, 03749 flow_spec); 03750 } 03751 catch (const CORBA::Exception& ex) 03752 { 03753 ex._tao_print_exception ( 03754 "TAO_FlowConnection::add_consumer"); 03755 return 0; 03756 } 03757 return 1; 03758 } |
|
adds the producer to this flow connection.
Definition at line 3585 of file AVStreams_i.cpp. References ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_INET_Addr::addr_to_string(), TAO_FlowSpec_Entry::address(), ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_producer_set_, FlowProducer_SetItor, ACE_Unbounded_Set< T >::insert(), ip_multicast_, CORBA::is_nil(), LM_WARNING, mcastconfigif_i_, producer_address_, ACE_INET_Addr::set(), ACE_OS::sprintf(), and ACE_OS::strcpy().
03587 { 03588 try 03589 { 03590 AVStreams::FlowProducer_ptr flow_producer = 03591 AVStreams::FlowProducer::_duplicate (producer); 03592 // @@Naga:Sometimes the same producer could be added with a different object reference. 03593 // There's no portable way of comparing obj refs. but we have to do this till we find 03594 // a permanent solution.For eg. 2 different flowproducers for the same flow in a 03595 // Multipoint to Multipoint binding will have the same flowname and hence cannot be 03596 // used for resolving ties. 03597 FlowProducer_SetItor begin = this->flow_producer_set_.begin (); 03598 FlowProducer_SetItor end = this->flow_producer_set_.end (); 03599 for (; begin != end; ++begin) 03600 { 03601 if ((*begin)->_is_equivalent (producer)) 03602 // producer exists in the set, a duplicate. 03603 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1); 03604 } 03605 // We need to check the return value of the insert into the flow producer 03606 // set, since multiconnect could be called many times which will lead to 03607 // a call to add_producer every time a sink is added. If the producer is already 03608 // present in our list we just return immediately. 03609 int result = this->flow_producer_set_.insert (flow_producer); 03610 if (result == 1) 03611 { 03612 // producer exists in the set, a duplicate. 03613 ACE_ERROR_RETURN ((LM_WARNING, "TAO_FlowConnection::add_producer: producer already exists\n"), 1); 03614 } 03615 CORBA::Boolean met_qos; 03616 char mcast_address[BUFSIZ]; 03617 if (this->producer_address_.in () == 0) 03618 { 03619 ACE_INET_Addr mcast_addr; 03620 mcast_addr.set (this->mcast_port_, 03621 this->mcast_addr_.c_str () 03622 ); 03623 03624 char buf [BUFSIZ]; 03625 mcast_addr.addr_to_string (buf, BUFSIZ); 03626 ACE_OS::sprintf (mcast_address, "%s=%s", this->protocol_.in (), buf); 03627 } 03628 else 03629 { 03630 ACE_OS::strcpy (mcast_address, this->producer_address_.in ()); 03631 } 03632 char *address = flow_producer->connect_mcast (the_qos, 03633 met_qos, 03634 mcast_address, 03635 this->fp_name_.in ()); 03636 03637 if (this->producer_address_.in () == 0) 03638 { 03639 TAO_Forward_FlowSpec_Entry entry ("", "", "", "", address); 03640 if (entry.address () != 0) 03641 { 03642 // Internet multicasting is in use. 03643 this->producer_address_ = address; 03644 } 03645 else 03646 { 03647 // ATM Multicasting is in use. 03648 this->ip_multicast_ = 0; 03649 } 03650 } 03651 // set the multicast peer. 03652 if (CORBA::is_nil (this->mcastconfigif_.in ())) 03653 { 03654 ACE_NEW_RETURN (this->mcastconfigif_i_, 03655 TAO_MCastConfigIf, 03656 0); 03657 this->mcastconfigif_ = this->mcastconfigif_i_->_this (); 03658 } 03659 AVStreams::FlowConnection_var flowconnection = this->_this (); 03660 flow_producer->set_Mcast_peer (flowconnection.in (), 03661 this->mcastconfigif_.in (), 03662 the_qos); 03663 } 03664 catch (const CORBA::Exception& ex) 03665 { 03666 ex._tao_print_exception ( 03667 "TAO_FlowConnection::add_producer"); 03668 return 0; 03669 } 03670 return 1; 03671 } |
|
Connect a flow producer and consumer under this flow connection.
Definition at line 3518 of file AVStreams_i.cpp. References ACE_DEBUG, flow_consumer_set_, flow_producer_set_, ACE_Unbounded_Set< T >::insert(), LM_DEBUG, ACE_OS::strcmp(), and TAO_debug_level. Referenced by connect_devs().
03521 { 03522 try 03523 { 03524 03525 AVStreams::FlowProducer_ptr flow_producer = 03526 AVStreams::FlowProducer::_duplicate (producer); 03527 AVStreams::FlowConsumer_ptr flow_consumer = 03528 AVStreams::FlowConsumer::_duplicate (consumer); 03529 03530 this->flow_producer_set_.insert (flow_producer); 03531 this->flow_consumer_set_.insert (flow_consumer); 03532 AVStreams::FlowConnection_var flowconnection = 03533 this->_this (); 03534 03535 flow_producer->set_peer (flowconnection.in (), 03536 flow_consumer, 03537 the_qos); 03538 03539 flow_consumer->set_peer (flowconnection.in (), 03540 flow_producer, 03541 the_qos); 03542 03543 char *consumer_address = 03544 flow_consumer->go_to_listen (the_qos, 03545 0, // false for is_mcast 03546 flow_producer, 03547 this->fp_name_.inout ()); 03548 03549 if (ACE_OS::strcmp (consumer_address, "") == 0) 03550 { 03551 // Consumer is not willing to listen, so try the producer. 03552 consumer_address = flow_producer->go_to_listen (the_qos, 03553 0, // false for is_mcast 03554 flow_consumer, 03555 this->fp_name_.inout ()); 03556 flow_consumer->connect_to_peer (the_qos, 03557 consumer_address, 03558 this->fp_name_.inout ()); 03559 // @@ Naga: We have to find means to set the reverse channel for the producer. 03560 // Its broken in the point-to_point case for UDP. 03561 } 03562 else 03563 { 03564 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::connect_to_peer addres: %s", consumer_address)); 03565 flow_producer->connect_to_peer (the_qos, 03566 consumer_address, 03567 this->fp_name_.inout ()); 03568 } 03569 } 03570 catch (const CORBA::Exception& ex) 03571 { 03572 ex._tao_print_exception ("TAO_FlowConnection::connect"); 03573 } 03574 return 1; 03575 } |
|
connect 2 Flow Devices.
Definition at line 3483 of file AVStreams_i.cpp. References connect().
03486 { 03487 CORBA::Boolean result = 0; 03488 try 03489 { 03490 AVStreams::FlowConnection_var flowconnection = this->_this (); 03491 CORBA::Boolean met_qos; 03492 CORBA::String_var named_fdev ((const char *)""); 03493 AVStreams::FlowProducer_var producer = 03494 a_party->create_producer (flowconnection.in (), 03495 flow_qos, 03496 met_qos, 03497 named_fdev.inout ()); 03498 AVStreams::FlowConsumer_var consumer = 03499 b_party->create_consumer (flowconnection.in (), 03500 flow_qos, 03501 met_qos, 03502 named_fdev.inout ()); 03503 result = this->connect (producer.in (), 03504 consumer.in (), 03505 flow_qos); 03506 } 03507 catch (const CORBA::Exception& ex) 03508 { 03509 ex._tao_print_exception ( 03510 "TAO_FlowConnection::connect_devs"); 03511 return 0; 03512 } 03513 return result; 03514 } |
|
destroy this flow.
Definition at line 3409 of file AVStreams_i.cpp. References ACE_DEBUG, ACE_Unbounded_Set< T >::begin(), TAO_AV_Core::deactivate_servant(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, LM_DEBUG, and TAO_debug_level.
03410 { 03411 try 03412 { 03413 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin 03414 (); 03415 for (FlowProducer_SetItor producer_end = 03416 this->flow_producer_set_.end (); 03417 producer_begin != producer_end; ++producer_begin) 03418 { 03419 (*producer_begin)->destroy (); 03420 } 03421 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin 03422 (); 03423 for (FlowConsumer_SetItor consumer_end = 03424 this->flow_consumer_set_.end (); 03425 consumer_begin != consumer_end; ++consumer_begin) 03426 { 03427 (*consumer_begin)->destroy (); 03428 } 03429 } 03430 catch (const CORBA::Exception& ex) 03431 { 03432 ex._tao_print_exception ("TAO_FlowConnection::destroy"); 03433 return; 03434 } 03435 int result = TAO_AV_Core::deactivate_servant (this); 03436 if (result < 0) 03437 if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO_FlowConnection::destroy failed\n")); 03438 } |
|
disconnect this flow connection.
Definition at line 3579 of file AVStreams_i.cpp.
03580 {
03581 return 0;
03582 }
|
|
drops a flow endpoint from the flow.
Definition at line 3761 of file AVStreams_i.cpp.
03762 {
03763 ACE_UNUSED_ARG (target);
03764 return 0;
03765 }
|
|
modify the QoS for this flow.
Definition at line 3442 of file AVStreams_i.cpp.
03443 {
03444 ACE_UNUSED_ARG (new_qos);
03445 return 0;
03446 }
|
|
pushes an event , to be handled by the application.
Definition at line 3477 of file AVStreams_i.cpp. References AVStreams::streamEvent.
03478 { 03479 ACE_UNUSED_ARG (the_event); 03480 } |
|
Definition at line 3334 of file AVStreams_i.cpp. Referenced by TAO_StreamEndPoint_A::multiconnect().
03335 { 03336 this->mcast_addr_ = mcast_addr; 03337 this->mcast_port_ = mcast_port; 03338 return 0; 03339 } |
|
Definition at line 3342 of file AVStreams_i.cpp. Referenced by TAO_StreamEndPoint_A::multiconnect().
03343 { 03344 this->protocol_ = protocol; 03345 } |
|
start this flow.
Definition at line 3379 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, and FlowProducer_SetItor.
03380 { 03381 try 03382 { 03383 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin 03384 (); 03385 for (FlowConsumer_SetItor consumer_end = 03386 this->flow_consumer_set_.end (); 03387 consumer_begin != consumer_end; ++consumer_begin) 03388 { 03389 (*consumer_begin)->start (); 03390 } 03391 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin 03392 (); 03393 for (FlowProducer_SetItor producer_end = 03394 this->flow_producer_set_.end (); 03395 producer_begin != producer_end; ++producer_begin) 03396 { 03397 (*producer_begin)->start (); 03398 } 03399 } 03400 catch (const CORBA::Exception& ex) 03401 { 03402 ex._tao_print_exception ("TAO_FlowConnection::start"); 03403 return; 03404 } 03405 } |
|
stop this flow.
Definition at line 3349 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, and FlowProducer_SetItor.
03350 { 03351 try 03352 { 03353 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin 03354 (); 03355 for (FlowProducer_SetItor producer_end = 03356 this->flow_producer_set_.end (); 03357 producer_begin != producer_end; ++producer_begin) 03358 { 03359 (*producer_begin)->stop (); 03360 } 03361 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin 03362 (); 03363 for (FlowConsumer_SetItor consumer_end = 03364 this->flow_consumer_set_.end (); 03365 consumer_begin != consumer_end; ++consumer_begin) 03366 { 03367 (*consumer_begin)->stop (); 03368 } 03369 } 03370 catch (const CORBA::Exception& ex) 03371 { 03372 ex._tao_print_exception ("TAO_FlowConnection::stop"); 03373 return; 03374 } 03375 } |
|
use the specified flow protocol for this flow.
Definition at line 3450 of file AVStreams_i.cpp. References ACE_Unbounded_Set< T >::begin(), ACE_Unbounded_Set< T >::end(), flow_consumer_set_, flow_producer_set_, FlowConsumer_SetItor, FlowProducer_SetItor, fp_name_, and fp_settings_.
03452 { 03453 this->fp_name_ = fp_name; 03454 this->fp_settings_ = fp_settings; 03455 FlowProducer_SetItor producer_begin = this->flow_producer_set_.begin 03456 (); 03457 for (FlowProducer_SetItor producer_end = 03458 this->flow_producer_set_.end (); 03459 producer_begin != producer_end; ++producer_begin) 03460 { 03461 (*producer_begin)->use_flow_protocol 03462 (fp_name, fp_settings); 03463 } 03464 FlowConsumer_SetItor consumer_begin = this->flow_consumer_set_.begin 03465 (); 03466 for (FlowConsumer_SetItor consumer_end = 03467 this->flow_consumer_set_.end (); 03468 consumer_begin != consumer_end; ++consumer_begin) 03469 { 03470 (*consumer_begin)->use_flow_protocol 03471 (fp_name, fp_settings); 03472 } 03473 return 1; 03474 } |
|
Definition at line 864 of file AVStreams_i.h. Referenced by add_consumer(), connect(), destroy(), start(), stop(), and use_flow_protocol(). |
|
The multicast address returned by the producer.
Definition at line 863 of file AVStreams_i.h. Referenced by add_consumer(), add_producer(), connect(), destroy(), start(), stop(), and use_flow_protocol(). |
|
Definition at line 865 of file AVStreams_i.h. Referenced by add_consumer(), and use_flow_protocol(). |
|
Definition at line 866 of file AVStreams_i.h. Referenced by use_flow_protocol(). |
|
IP Multicasting is used.
Definition at line 870 of file AVStreams_i.h. Referenced by add_consumer(), and add_producer(). |
|
Definition at line 874 of file AVStreams_i.h. |
|
Definition at line 873 of file AVStreams_i.h. |
|
Definition at line 872 of file AVStreams_i.h. |
|
Definition at line 871 of file AVStreams_i.h. Referenced by add_producer(). |
|
Definition at line 867 of file AVStreams_i.h. Referenced by add_producer(). |
|
Definition at line 875 of file AVStreams_i.h. |