TAO_ECG_Mcast_EH Class Reference

Event Handler for Mcast messages. NOT THREAD-SAFE. More...

#include <ECG_Mcast_EH.h>

Inheritance diagram for TAO_ECG_Mcast_EH:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Mcast_EH:

Collaboration graph
[legend]
List of all members.

Public Member Functions

virtual int handle_input (ACE_HANDLE fd)
 TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, const ACE_TCHAR *net_if=0, CORBA::ULong buf_sz=0)
 Initialization and termination methods.
virtual ~TAO_ECG_Mcast_EH (void)
 Destructor.
void open (RtecEventChannelAdmin::EventChannel_ptr ec)
virtual int shutdown (void)
 TAO_ECG_Handler_Shutdown method.

Private Types

typedef ACE_Unbounded_Set<
ACE_INET_Addr
Address_Set
typedef ACE_Array_Base< SubscriptionSubscriptions

Private Member Functions

void update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub)
void compute_required_subscriptions (const RtecEventChannelAdmin::ConsumerQOS &sub, Address_Set &multicast_addresses)
int delete_unwanted_subscriptions (Address_Set &multicast_addresses)
void add_new_subscriptions (Address_Set &multicast_addresses)

Private Attributes

ACE_TCHARnet_if_
 The NIC used to subscribe for multicast traffic.
Subscriptions subscriptions_
 List of multicast addresses we subscribe to and dgrams we use.
TAO_ECG_Dgram_Handlerreceiver_
 We callback to this object when a message arrives.
CORBA::ULong recvbuf_size_
 SOCKbuf size.
TAO_EC_Servant_Var< Observerobserver_
TAO_EC_Auto_Command< Observer_Disconnect_Commandauto_observer_disconnect_

Friends

class Observer
 Make update_consumer () accessible to Observer.

Classes

class  Observer
 Observes changes in the EC consumer subscriptions and notifies TAO_ECG_Mcast_EH when there are changes. More...
class  Observer_Disconnect_Command
 Disconnects Observer from the Event Channel. More...
struct  Subscription

Detailed Description

Event Handler for Mcast messages. NOT THREAD-SAFE.

This object acts as an Observer to Event Channel. It subscribes to multicast groups that carry events matching the EC's subscriptions. This object then receives callbacks from the Reactor when data is available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, which reads the data, transforms it into event and pushes to the Event Channel.

Definition at line 52 of file ECG_Mcast_EH.h.


Member Typedef Documentation

typedef ACE_Unbounded_Set<ACE_INET_Addr> TAO_ECG_Mcast_EH::Address_Set [private]

Definition at line 148 of file ECG_Mcast_EH.h.

typedef ACE_Array_Base<Subscription> TAO_ECG_Mcast_EH::Subscriptions [private]

Definition at line 230 of file ECG_Mcast_EH.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH ( TAO_ECG_Dgram_Handler recv,
const ACE_TCHAR net_if = 0,
CORBA::ULong  buf_sz = 0 
)

Initialization and termination methods.

Constructor. Messages received by this EH will be forwarded to the recv. net_if can be used to specify NIC where multicast messages are expected. would be used to alter the default buffer size.

See comments for receiver_ data member on why raw pointer is used for the recv argument.

Definition at line 20 of file ECG_Mcast_EH.cpp.

References ACE_ASSERT.

00023   : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024   , subscriptions_ ()
00025   , receiver_ (recv)
00026   , recvbuf_size_ (sz)
00027   , observer_ ()
00028   , auto_observer_disconnect_ ()
00029 {
00030   ACE_ASSERT (this->receiver_);
00031 }

TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH ( void   )  [virtual]

Destructor.

Definition at line 33 of file ECG_Mcast_EH.cpp.

References ACE_OS::free().

00034 {
00035   ACE_OS::free (this->net_if_);
00036 }


Member Function Documentation

void TAO_ECG_Mcast_EH::add_new_subscriptions ( Address_Set multicast_addresses  )  [private]

Parameters:
multicast_addresses List of multicast addresses to which we need to subscribe to in order to be receiving all event types in the current consumer subscriptions.

Definition at line 231 of file ECG_Mcast_EH.cpp.

References ACE_ERROR, ACE_NEW, ACE_NONBLOCK, ACE_Unbounded_Set_Ex< T, C >::begin(), ACE_Unbounded_Set_Ex< T, C >::end(), ENOTSUP, LM_ERROR, ACE_Event_Handler::READ_MASK, ACE_Array_Base< T >::size(), SO_RCVBUF, socket(), SOL_SOCKET, and subscriptions_.

Referenced by update_consumer().

00232 {
00233   typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00234   for (Address_Iterator k = multicast_addresses.begin ();
00235        k != multicast_addresses.end ();
00236        ++k)
00237     {
00238       Subscription new_subscription;
00239       new_subscription.mcast_addr = *k;
00240       ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00241 
00242       size_t const subscriptions_size = this->subscriptions_.size ();
00243       this->subscriptions_.size (subscriptions_size + 1);
00244       this->subscriptions_[subscriptions_size] = new_subscription;
00245 
00246       ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00247 
00248       if (socket->open (new_subscription.mcast_addr, this->net_if_, 1) == -1) {
00249         ACE_ERROR ((LM_ERROR,
00250                     "Error: %d - Unable to open multicast socket\n",
00251                     errno ));
00252       }
00253 
00254       if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00255         ACE_ERROR ((LM_ERROR,
00256                     "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00257                     errno ));
00258       }
00259 
00260       if (socket->join (new_subscription.mcast_addr, 1, this->net_if_) == -1) {
00261         ACE_ERROR ((LM_ERROR,
00262                     "Error: %d - Unable to join multicast group\n",
00263                     errno ));
00264       }
00265 
00266       if (this->recvbuf_size_ != 0
00267           && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00268                                                      SO_RCVBUF,
00269                                                      (void *) &this->recvbuf_size_,
00270                                                      sizeof (this->recvbuf_size_)) == -1)
00271           && errno != ENOTSUP )
00272         {
00273           ACE_ERROR ((LM_ERROR,
00274                       "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00275                       errno,
00276                       this->recvbuf_size_));
00277         }
00278       (void) this->reactor ()->register_handler (
00279                                          socket->get_handle (),
00280                                          this,
00281                                          ACE_Event_Handler::READ_MASK);
00282     }
00283 }

void TAO_ECG_Mcast_EH::compute_required_subscriptions ( const RtecEventChannelAdmin::ConsumerQOS sub,
Address_Set multicast_addresses 
) [private]

Parameters:
sub The list of event types that our event channel consumers are interested in.
multicast_addresses This method populates this list with multicast addresses that we need to be subscribed to in order to receive event types specified in /a sub.
Exceptions:
CORBA::SystemException This method needs to perform several CORBA invocations, and it propagates any exceptions back to the caller.

Definition at line 146 of file ECG_Mcast_EH.cpp.

References ACE_ES_EVENT_UNDEFINED, RtecEventChannelAdmin::ConsumerQOS::dependencies, TAO_ECG_Dgram_Handler::get_addr(), TAO_ECG_Dgram_Handler::get_address(), inet_addr(), ACE_Unbounded_Set_Ex< T, C >::insert(), RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, receiver_, RtecUDPAdmin::Rtec_inet, and RtecUDPAdmin::Rtec_inet6.

Referenced by update_consumer().

00149 {
00150   CORBA::ULong count = sub.dependencies.length ();
00151   for (CORBA::ULong i = 0; i != count; ++i)
00152     {
00153       const RtecEventComm::EventHeader& header =
00154                            sub.dependencies[i].event.header;
00155       if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00156         {
00157           continue;
00158         }
00159 
00160       ACE_INET_Addr inet_addr;
00161       try
00162         {
00163           // Grab the right mcast group for this event...
00164           RtecUDPAdmin::UDP_Address_var udp_addr;
00165 
00166           this->receiver_->get_address (header, udp_addr.out());
00167           switch (udp_addr->_d())
00168             {
00169             case RtecUDPAdmin::Rtec_inet:
00170               inet_addr.set(udp_addr->v4_addr().port,
00171                             udp_addr->v4_addr().ipaddr);
00172               break;
00173             case RtecUDPAdmin::Rtec_inet6:
00174 #if defined (ACE_HAS_IPV6)
00175               inet_addr.set_type(PF_INET6);
00176 #endif
00177               inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00178               inet_addr.set_port_number(udp_addr->v6_addr().port);
00179               break;
00180             }
00181         }
00182       catch (const ::CORBA::BAD_OPERATION &)
00183         {
00184           // server only supports IPv4
00185            // Grab the right mcast group for this event...
00186           RtecUDPAdmin::UDP_Addr udp_addr;
00187           this->receiver_->get_addr (header, udp_addr);
00188           inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00189         }
00190 
00191       // Ignore errors, if the element is in the set we simply ignore
00192       // the problem...
00193       (void) multicast_addresses.insert (inet_addr);
00194     }
00195 }

int TAO_ECG_Mcast_EH::delete_unwanted_subscriptions ( Address_Set multicast_addresses  )  [private]

Parameters:
multicast_addresses List of multicast addresses we need to be subscribed to in order receive all event types in the current consumer subscriptions.

Definition at line 198 of file ECG_Mcast_EH.cpp.

References ACE_Unbounded_Set_Ex< T, C >::find(), ACE_Event_Handler::READ_MASK, ACE_Unbounded_Set_Ex< T, C >::remove(), ACE_Array_Base< T >::size(), socket(), and subscriptions_.

Referenced by update_consumer().

00200 {
00201   for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00202     {
00203       ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00204       if (multicast_addresses.find (multicast_group))
00205         {
00206           // Remove from the list of subscriptions to be added,
00207           // because we already subscribe to it...
00208           (void) multicast_addresses.remove (multicast_group);
00209           continue;
00210         }
00211 
00212       // This subscription is no longer needed - remove from reactor,
00213       // close and delete the socket.
00214       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00215       (void) this->reactor ()->remove_handler (socket->get_handle (),
00216                                                ACE_Event_Handler::READ_MASK);
00217       (void) socket->close();
00218       delete socket;
00219       // Move the deleted subscription out of the <subscriptions_>
00220       // array by moving the last subscription in array into its place.
00221       this->subscriptions_[i] =
00222         this->subscriptions_[this->subscriptions_.size () - 1];
00223       this->subscriptions_.size (this->subscriptions_.size () - 1);
00224       --i;
00225     }
00226 
00227     return 0;
00228 }

int TAO_ECG_Mcast_EH::handle_input ( ACE_HANDLE  fd  )  [virtual]

Reactor callback. Notify receiver_ that a dgram corresponding to fd is ready for reading.

Reimplemented from ACE_Event_Handler.

Definition at line 117 of file ECG_Mcast_EH.cpp.

References ACE_Array_Base< T >::size(), socket(), and subscriptions_.

00118 {
00119   size_t const subscriptions_size = this->subscriptions_.size ();
00120   for (size_t i = 0; i != subscriptions_size; ++i)
00121     {
00122       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00123       if (socket->get_handle () == fd)
00124         {
00125           return this->receiver_->handle_input (*socket);
00126         }
00127     }
00128   return -1;
00129 }

void TAO_ECG_Mcast_EH::open ( RtecEventChannelAdmin::EventChannel_ptr  ec  ) 

Register for changes in the EC subscription list. When the subscription list becomes non-empty we join the proper multicast groups (using Dgram_Handler to translate between event types and mcast groups) and the class registers itself with the reactor.

To insure proper resource clean up, if open () is successful, the user MUST call shutdown () when handler is no longer needed (and its reactor still exists).

Definition at line 39 of file ECG_Mcast_EH.cpp.

References ACE_ERROR, ACE_NEW, activate(), auto_observer_disconnect_, TAO_Objref_Var_T< T >::in(), CORBA::is_nil(), LM_ERROR, and observer_.

Referenced by TAO_ECG_Mcast_Gateway::init_handler().

00040 {
00041   if (!this->receiver_)
00042     {
00043       // We are shut down.
00044       throw CORBA::INTERNAL();
00045     }
00046 
00047   if (CORBA::is_nil (ec))
00048     {
00049       ACE_ERROR ((LM_ERROR,  "TAO_ECG_Mcast_EH::open(): "
00050                              "nil ec argument"));
00051       throw CORBA::INTERNAL ();
00052     }
00053 
00054   // Create and activate Event Channel Observer.
00055   ACE_NEW (this->observer_,
00056            Observer (this));
00057 
00058   if (!this->observer_.in ())
00059     {
00060       throw CORBA::NO_MEMORY ();
00061     }
00062 
00063   TAO_EC_Object_Deactivator observer_deactivator;
00064   RtecEventChannelAdmin::Observer_var observer_ref;
00065   PortableServer::POA_var poa =
00066     this->observer_->_default_POA ();
00067 
00068   activate (observer_ref,
00069             poa.in (),
00070             this->observer_.in (),
00071             observer_deactivator);
00072 
00073   RtecEventChannelAdmin::Observer_Handle handle =
00074     ec->append_observer (observer_ref.in ());
00075 
00076   this->observer_->set_deactivator (observer_deactivator);
00077   this->auto_observer_disconnect_.set_command
00078     (Observer_Disconnect_Command (handle, ec));
00079 }

int TAO_ECG_Mcast_EH::shutdown ( void   )  [virtual]

TAO_ECG_Handler_Shutdown method.

Remove ourselves from the event channel, unsubscribe from the multicast groups, close the sockets and deregister from the reactor.

Implements TAO_ECG_Handler_Shutdown.

Definition at line 82 of file ECG_Mcast_EH.cpp.

References auto_observer_disconnect_, observer_, ACE_Event_Handler::READ_MASK, receiver_, ACE_Array_Base< T >::size(), and subscriptions_.

00083 {
00084   // Already shut down.
00085   if (!this->receiver_)
00086     return -1;
00087 
00088   // Disconnect Observer from EC.
00089   this->auto_observer_disconnect_.execute ();
00090 
00091   // Shutdown the observer.
00092   if (this->observer_.in ())
00093     {
00094       this->observer_->shutdown ();
00095       this->observer_ = 0;
00096     }
00097 
00098   // Indicates that we are in a shutdown state.
00099   this->receiver_ = 0;
00100 
00101   // Deregister from reactor, close and clean up sockets.
00102   size_t const subscriptions_size = this->subscriptions_.size ();
00103   for (size_t i = 0; i != subscriptions_size; ++i)
00104     {
00105       (void) this->reactor ()->remove_handler (
00106                                this->subscriptions_[i].dgram->get_handle (),
00107                                ACE_Event_Handler::READ_MASK);
00108       (void) this->subscriptions_[i].dgram->close();
00109       delete this->subscriptions_[i].dgram;
00110     }
00111   this->subscriptions_.size (0);
00112 
00113   return 0;
00114 }

void TAO_ECG_Mcast_EH::update_consumer ( const RtecEventChannelAdmin::ConsumerQOS sub  )  [private]

The Observer method. Subscribe/unsubscribe to multicast groups according to changes in consumer subscriptions.

Definition at line 132 of file ECG_Mcast_EH.cpp.

References add_new_subscriptions(), compute_required_subscriptions(), and delete_unwanted_subscriptions().

Referenced by TAO_ECG_Mcast_EH::Observer::update_consumer().

00134 {
00135   Address_Set multicast_addresses;
00136 
00137   this->compute_required_subscriptions (sub,
00138                                         multicast_addresses);
00139 
00140   this->delete_unwanted_subscriptions (multicast_addresses);
00141 
00142   this->add_new_subscriptions (multicast_addresses);
00143 }


Friends And Related Function Documentation

friend class Observer [friend]

Make update_consumer () accessible to Observer.

Definition at line 141 of file ECG_Mcast_EH.h.


Member Data Documentation

TAO_EC_Auto_Command<Observer_Disconnect_Command> TAO_ECG_Mcast_EH::auto_observer_disconnect_ [private]

Manages connection of our observer to the Event Channel. ORDER DEPENDENCY: this member should be declared AFTER <observer_>.

Definition at line 274 of file ECG_Mcast_EH.h.

Referenced by open(), and shutdown().

ACE_TCHAR* TAO_ECG_Mcast_EH::net_if_ [private]

The NIC used to subscribe for multicast traffic.

Definition at line 224 of file ECG_Mcast_EH.h.

TAO_EC_Servant_Var<Observer> TAO_ECG_Mcast_EH::observer_ [private]

Event Channel Observer. Detects changes in EC consumer subscriptions. ORDER DEPENDENCY: this member should be declared before <auto_observer_disconnect_>.

Definition at line 270 of file ECG_Mcast_EH.h.

Referenced by open(), and shutdown().

TAO_ECG_Dgram_Handler* TAO_ECG_Mcast_EH::receiver_ [private]

We callback to this object when a message arrives.

Definition at line 262 of file ECG_Mcast_EH.h.

Referenced by compute_required_subscriptions(), and shutdown().

CORBA::ULong TAO_ECG_Mcast_EH::recvbuf_size_ [private]

SOCKbuf size.

Definition at line 265 of file ECG_Mcast_EH.h.

Subscriptions TAO_ECG_Mcast_EH::subscriptions_ [private]

List of multicast addresses we subscribe to and dgrams we use.

Definition at line 250 of file ECG_Mcast_EH.h.

Referenced by add_new_subscriptions(), delete_unwanted_subscriptions(), handle_input(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Tue Feb 2 17:44:38 2010 for TAO_RTEvent by  doxygen 1.4.7