#include <ECG_Mcast_EH.h>
Inheritance diagram for TAO_ECG_Mcast_EH:
Public Member Functions | |
virtual int | handle_input (ACE_HANDLE fd) |
TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, const ACE_TCHAR *net_if=0, CORBA::ULong buf_sz=0) | |
Initialization and termination methods. | |
virtual | ~TAO_ECG_Mcast_EH (void) |
Destructor. | |
void | open (RtecEventChannelAdmin::EventChannel_ptr ec) |
virtual int | shutdown (void) |
TAO_ECG_Handler_Shutdown method. | |
Private Types | |
typedef ACE_Unbounded_Set< ACE_INET_Addr > | Address_Set |
typedef ACE_Array_Base< Subscription > | Subscriptions |
Private Member Functions | |
void | update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub) |
void | compute_required_subscriptions (const RtecEventChannelAdmin::ConsumerQOS &sub, Address_Set &multicast_addresses) |
int | delete_unwanted_subscriptions (Address_Set &multicast_addresses) |
void | add_new_subscriptions (Address_Set &multicast_addresses) |
Private Attributes | |
ACE_TCHAR * | net_if_ |
The NIC used to subscribe for multicast traffic. | |
Subscriptions | subscriptions_ |
List of multicast addresses we subscribe to and dgrams we use. | |
TAO_ECG_Dgram_Handler * | receiver_ |
We callback to this object when a message arrives. | |
CORBA::ULong | recvbuf_size_ |
SOCKbuf size. | |
TAO_EC_Servant_Var< Observer > | observer_ |
TAO_EC_Auto_Command< Observer_Disconnect_Command > | auto_observer_disconnect_ |
Friends | |
class | Observer |
Make update_consumer () accessible to Observer. | |
Classes | |
class | Observer |
Observes changes in the EC consumer subscriptions and notifies TAO_ECG_Mcast_EH when there are changes. More... | |
class | Observer_Disconnect_Command |
Disconnects Observer from the Event Channel. More... | |
struct | Subscription |
This object acts as an Observer to Event Channel. It subscribes to multicast groups that carry events matching the EC's subscriptions. This object then receives callbacks from the Reactor when data is available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, which reads the data, transforms it into event and pushes to the Event Channel.
Definition at line 52 of file ECG_Mcast_EH.h.
typedef ACE_Unbounded_Set<ACE_INET_Addr> TAO_ECG_Mcast_EH::Address_Set [private] |
Definition at line 148 of file ECG_Mcast_EH.h.
typedef ACE_Array_Base<Subscription> TAO_ECG_Mcast_EH::Subscriptions [private] |
Definition at line 230 of file ECG_Mcast_EH.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH | ( | TAO_ECG_Dgram_Handler * | recv, | |
const ACE_TCHAR * | net_if = 0 , |
|||
CORBA::ULong | buf_sz = 0 | |||
) |
Initialization and termination methods.
Constructor. Messages received by this EH will be forwarded to the recv. net_if can be used to specify NIC where multicast messages are expected. would be used to alter the default buffer size.
See comments for receiver_ data member on why raw pointer is used for the recv argument.
Definition at line 20 of file ECG_Mcast_EH.cpp.
References ACE_ASSERT.
00023 : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0) 00024 , subscriptions_ () 00025 , receiver_ (recv) 00026 , recvbuf_size_ (sz) 00027 , observer_ () 00028 , auto_observer_disconnect_ () 00029 { 00030 ACE_ASSERT (this->receiver_); 00031 }
TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH | ( | void | ) | [virtual] |
Destructor.
Definition at line 33 of file ECG_Mcast_EH.cpp.
References ACE_OS::free().
00034 { 00035 ACE_OS::free (this->net_if_); 00036 }
void TAO_ECG_Mcast_EH::add_new_subscriptions | ( | Address_Set & | multicast_addresses | ) | [private] |
multicast_addresses | List of multicast addresses to which we need to subscribe to in order to be receiving all event types in the current consumer subscriptions. |
Definition at line 231 of file ECG_Mcast_EH.cpp.
References ACE_ERROR, ACE_NEW, ACE_NONBLOCK, ACE_Unbounded_Set_Ex< T, C >::begin(), ACE_Unbounded_Set_Ex< T, C >::end(), ENOTSUP, LM_ERROR, ACE_Event_Handler::READ_MASK, ACE_Array_Base< T >::size(), SO_RCVBUF, socket(), SOL_SOCKET, and subscriptions_.
Referenced by update_consumer().
00232 { 00233 typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator; 00234 for (Address_Iterator k = multicast_addresses.begin (); 00235 k != multicast_addresses.end (); 00236 ++k) 00237 { 00238 Subscription new_subscription; 00239 new_subscription.mcast_addr = *k; 00240 ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast); 00241 00242 size_t const subscriptions_size = this->subscriptions_.size (); 00243 this->subscriptions_.size (subscriptions_size + 1); 00244 this->subscriptions_[subscriptions_size] = new_subscription; 00245 00246 ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram; 00247 00248 if (socket->open (new_subscription.mcast_addr, this->net_if_, 1) == -1) { 00249 ACE_ERROR ((LM_ERROR, 00250 "Error: %d - Unable to open multicast socket\n", 00251 errno )); 00252 } 00253 00254 if ( socket->enable (ACE_NONBLOCK) != 0 ) { 00255 ACE_ERROR ((LM_ERROR, 00256 "Error: %d - Unable to enable nonblocking on mcast_eh\n", 00257 errno )); 00258 } 00259 00260 if (socket->join (new_subscription.mcast_addr, 1, this->net_if_) == -1) { 00261 ACE_ERROR ((LM_ERROR, 00262 "Error: %d - Unable to join multicast group\n", 00263 errno )); 00264 } 00265 00266 if (this->recvbuf_size_ != 0 00267 && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET, 00268 SO_RCVBUF, 00269 (void *) &this->recvbuf_size_, 00270 sizeof (this->recvbuf_size_)) == -1) 00271 && errno != ENOTSUP ) 00272 { 00273 ACE_ERROR ((LM_ERROR, 00274 "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n", 00275 errno, 00276 this->recvbuf_size_)); 00277 } 00278 (void) this->reactor ()->register_handler ( 00279 socket->get_handle (), 00280 this, 00281 ACE_Event_Handler::READ_MASK); 00282 } 00283 }
void TAO_ECG_Mcast_EH::compute_required_subscriptions | ( | const RtecEventChannelAdmin::ConsumerQOS & | sub, | |
Address_Set & | multicast_addresses | |||
) | [private] |
sub | The list of event types that our event channel consumers are interested in. | |
multicast_addresses | This method populates this list with multicast addresses that we need to be subscribed to in order to receive event types specified in /a sub. |
CORBA::SystemException | This method needs to perform several CORBA invocations, and it propagates any exceptions back to the caller. |
Definition at line 146 of file ECG_Mcast_EH.cpp.
References ACE_ES_EVENT_UNDEFINED, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_ECG_Dgram_Handler::get_addr(), TAO_ECG_Dgram_Handler::get_address(), inet_addr(), ACE_Unbounded_Set_Ex< T, C >::insert(), RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, receiver_, RtecUDPAdmin::Rtec_inet, and RtecUDPAdmin::Rtec_inet6.
Referenced by update_consumer().
00149 { 00150 CORBA::ULong count = sub.dependencies.length (); 00151 for (CORBA::ULong i = 0; i != count; ++i) 00152 { 00153 const RtecEventComm::EventHeader& header = 00154 sub.dependencies[i].event.header; 00155 if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED) 00156 { 00157 continue; 00158 } 00159 00160 ACE_INET_Addr inet_addr; 00161 try 00162 { 00163 // Grab the right mcast group for this event... 00164 RtecUDPAdmin::UDP_Address_var udp_addr; 00165 00166 this->receiver_->get_address (header, udp_addr.out()); 00167 switch (udp_addr->_d()) 00168 { 00169 case RtecUDPAdmin::Rtec_inet: 00170 inet_addr.set(udp_addr->v4_addr().port, 00171 udp_addr->v4_addr().ipaddr); 00172 break; 00173 case RtecUDPAdmin::Rtec_inet6: 00174 #if defined (ACE_HAS_IPV6) 00175 inet_addr.set_type(PF_INET6); 00176 #endif 00177 inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0); 00178 inet_addr.set_port_number(udp_addr->v6_addr().port); 00179 break; 00180 } 00181 } 00182 catch (const ::CORBA::BAD_OPERATION &) 00183 { 00184 // server only supports IPv4 00185 // Grab the right mcast group for this event... 00186 RtecUDPAdmin::UDP_Addr udp_addr; 00187 this->receiver_->get_addr (header, udp_addr); 00188 inet_addr.set (udp_addr.port, udp_addr.ipaddr); 00189 } 00190 00191 // Ignore errors, if the element is in the set we simply ignore 00192 // the problem... 00193 (void) multicast_addresses.insert (inet_addr); 00194 } 00195 }
int TAO_ECG_Mcast_EH::delete_unwanted_subscriptions | ( | Address_Set & | multicast_addresses | ) | [private] |
multicast_addresses | List of multicast addresses we need to be subscribed to in order receive all event types in the current consumer subscriptions. |
Definition at line 198 of file ECG_Mcast_EH.cpp.
References ACE_Unbounded_Set_Ex< T, C >::find(), ACE_Event_Handler::READ_MASK, ACE_Unbounded_Set_Ex< T, C >::remove(), ACE_Array_Base< T >::size(), socket(), and subscriptions_.
Referenced by update_consumer().
00200 { 00201 for (size_t i = 0; i < this->subscriptions_.size (); ++i) 00202 { 00203 ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr; 00204 if (multicast_addresses.find (multicast_group)) 00205 { 00206 // Remove from the list of subscriptions to be added, 00207 // because we already subscribe to it... 00208 (void) multicast_addresses.remove (multicast_group); 00209 continue; 00210 } 00211 00212 // This subscription is no longer needed - remove from reactor, 00213 // close and delete the socket. 00214 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram; 00215 (void) this->reactor ()->remove_handler (socket->get_handle (), 00216 ACE_Event_Handler::READ_MASK); 00217 (void) socket->close(); 00218 delete socket; 00219 // Move the deleted subscription out of the <subscriptions_> 00220 // array by moving the last subscription in array into its place. 00221 this->subscriptions_[i] = 00222 this->subscriptions_[this->subscriptions_.size () - 1]; 00223 this->subscriptions_.size (this->subscriptions_.size () - 1); 00224 --i; 00225 } 00226 00227 return 0; 00228 }
int TAO_ECG_Mcast_EH::handle_input | ( | ACE_HANDLE | fd | ) | [virtual] |
Reactor callback. Notify receiver_ that a dgram corresponding to fd is ready for reading.
Reimplemented from ACE_Event_Handler.
Definition at line 117 of file ECG_Mcast_EH.cpp.
References ACE_Array_Base< T >::size(), socket(), and subscriptions_.
00118 { 00119 size_t const subscriptions_size = this->subscriptions_.size (); 00120 for (size_t i = 0; i != subscriptions_size; ++i) 00121 { 00122 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram; 00123 if (socket->get_handle () == fd) 00124 { 00125 return this->receiver_->handle_input (*socket); 00126 } 00127 } 00128 return -1; 00129 }
void TAO_ECG_Mcast_EH::open | ( | RtecEventChannelAdmin::EventChannel_ptr | ec | ) |
Register for changes in the EC subscription list. When the subscription list becomes non-empty we join the proper multicast groups (using Dgram_Handler to translate between event types and mcast groups) and the class registers itself with the reactor.
To insure proper resource clean up, if open () is successful, the user MUST call shutdown () when handler is no longer needed (and its reactor still exists).
Definition at line 39 of file ECG_Mcast_EH.cpp.
References ACE_ERROR, ACE_NEW, activate(), auto_observer_disconnect_, TAO_Objref_Var_T< T >::in(), CORBA::is_nil(), LM_ERROR, and observer_.
Referenced by TAO_ECG_Mcast_Gateway::init_handler().
00040 { 00041 if (!this->receiver_) 00042 { 00043 // We are shut down. 00044 throw CORBA::INTERNAL(); 00045 } 00046 00047 if (CORBA::is_nil (ec)) 00048 { 00049 ACE_ERROR ((LM_ERROR, "TAO_ECG_Mcast_EH::open(): " 00050 "nil ec argument")); 00051 throw CORBA::INTERNAL (); 00052 } 00053 00054 // Create and activate Event Channel Observer. 00055 ACE_NEW (this->observer_, 00056 Observer (this)); 00057 00058 if (!this->observer_.in ()) 00059 { 00060 throw CORBA::NO_MEMORY (); 00061 } 00062 00063 TAO_EC_Object_Deactivator observer_deactivator; 00064 RtecEventChannelAdmin::Observer_var observer_ref; 00065 PortableServer::POA_var poa = 00066 this->observer_->_default_POA (); 00067 00068 activate (observer_ref, 00069 poa.in (), 00070 this->observer_.in (), 00071 observer_deactivator); 00072 00073 RtecEventChannelAdmin::Observer_Handle handle = 00074 ec->append_observer (observer_ref.in ()); 00075 00076 this->observer_->set_deactivator (observer_deactivator); 00077 this->auto_observer_disconnect_.set_command 00078 (Observer_Disconnect_Command (handle, ec)); 00079 }
int TAO_ECG_Mcast_EH::shutdown | ( | void | ) | [virtual] |
TAO_ECG_Handler_Shutdown method.
Remove ourselves from the event channel, unsubscribe from the multicast groups, close the sockets and deregister from the reactor.
Implements TAO_ECG_Handler_Shutdown.
Definition at line 82 of file ECG_Mcast_EH.cpp.
References auto_observer_disconnect_, observer_, ACE_Event_Handler::READ_MASK, receiver_, ACE_Array_Base< T >::size(), and subscriptions_.
00083 { 00084 // Already shut down. 00085 if (!this->receiver_) 00086 return -1; 00087 00088 // Disconnect Observer from EC. 00089 this->auto_observer_disconnect_.execute (); 00090 00091 // Shutdown the observer. 00092 if (this->observer_.in ()) 00093 { 00094 this->observer_->shutdown (); 00095 this->observer_ = 0; 00096 } 00097 00098 // Indicates that we are in a shutdown state. 00099 this->receiver_ = 0; 00100 00101 // Deregister from reactor, close and clean up sockets. 00102 size_t const subscriptions_size = this->subscriptions_.size (); 00103 for (size_t i = 0; i != subscriptions_size; ++i) 00104 { 00105 (void) this->reactor ()->remove_handler ( 00106 this->subscriptions_[i].dgram->get_handle (), 00107 ACE_Event_Handler::READ_MASK); 00108 (void) this->subscriptions_[i].dgram->close(); 00109 delete this->subscriptions_[i].dgram; 00110 } 00111 this->subscriptions_.size (0); 00112 00113 return 0; 00114 }
void TAO_ECG_Mcast_EH::update_consumer | ( | const RtecEventChannelAdmin::ConsumerQOS & | sub | ) | [private] |
The Observer method. Subscribe/unsubscribe to multicast groups according to changes in consumer subscriptions.
Definition at line 132 of file ECG_Mcast_EH.cpp.
References add_new_subscriptions(), compute_required_subscriptions(), and delete_unwanted_subscriptions().
Referenced by TAO_ECG_Mcast_EH::Observer::update_consumer().
00134 { 00135 Address_Set multicast_addresses; 00136 00137 this->compute_required_subscriptions (sub, 00138 multicast_addresses); 00139 00140 this->delete_unwanted_subscriptions (multicast_addresses); 00141 00142 this->add_new_subscriptions (multicast_addresses); 00143 }
friend class Observer [friend] |
TAO_EC_Auto_Command<Observer_Disconnect_Command> TAO_ECG_Mcast_EH::auto_observer_disconnect_ [private] |
Manages connection of our observer to the Event Channel. ORDER DEPENDENCY: this member should be declared AFTER <observer_>.
Definition at line 274 of file ECG_Mcast_EH.h.
Referenced by open(), and shutdown().
ACE_TCHAR* TAO_ECG_Mcast_EH::net_if_ [private] |
Event Channel Observer. Detects changes in EC consumer subscriptions. ORDER DEPENDENCY: this member should be declared before <auto_observer_disconnect_>.
Definition at line 270 of file ECG_Mcast_EH.h.
Referenced by open(), and shutdown().
TAO_ECG_Dgram_Handler* TAO_ECG_Mcast_EH::receiver_ [private] |
We callback to this object when a message arrives.
Definition at line 262 of file ECG_Mcast_EH.h.
Referenced by compute_required_subscriptions(), and shutdown().
CORBA::ULong TAO_ECG_Mcast_EH::recvbuf_size_ [private] |
List of multicast addresses we subscribe to and dgrams we use.
Definition at line 250 of file ECG_Mcast_EH.h.
Referenced by add_new_subscriptions(), delete_unwanted_subscriptions(), handle_input(), and shutdown().