TAO_ECG_Mcast_EH Class Reference

Event Handler for Mcast messages. NOT THREAD-SAFE. More...

#include <ECG_Mcast_EH.h>

Inheritance diagram for TAO_ECG_Mcast_EH:

Inheritance graph
[legend]
Collaboration diagram for TAO_ECG_Mcast_EH:

Collaboration graph
[legend]
List of all members.

Public Member Functions

virtual int handle_input (ACE_HANDLE fd)
 TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, const ACE_TCHAR *net_if=0, CORBA::ULong buf_sz=0)
virtual ~TAO_ECG_Mcast_EH (void)
 Destructor.

void open (RtecEventChannelAdmin::EventChannel_ptr ec)
virtual int shutdown (void)
 TAO_ECG_Handler_Shutdown method.


Private Types

typedef ACE_Unbounded_Set<
ACE_INET_Addr
Address_Set
typedef ACE_Array_Base< SubscriptionSubscriptions

Private Member Functions

void update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub) throw (CORBA::SystemException)
void compute_required_subscriptions (const RtecEventChannelAdmin::ConsumerQOS &sub, Address_Set &multicast_addresses) throw (CORBA::SystemException)
int delete_unwanted_subscriptions (Address_Set &multicast_addresses)
void add_new_subscriptions (Address_Set &multicast_addresses)

Private Attributes

ACE_TCHARnet_if_
 The NIC used to subscribe for multicast traffic.

Subscriptions subscriptions_
 List of multicast addresses we subscribe to and dgrams we use.

TAO_ECG_Dgram_Handlerreceiver_
 We callback to this object when a message arrives.

CORBA::ULong recvbuf_size_
 SOCKbuf size.

TAO_EC_Servant_Var< Observerobserver_
TAO_EC_Auto_Command< Observer_Disconnect_Commandauto_observer_disconnect_

Friends

class Observer
 Make update_consumer () accessible to Observer.


Detailed Description

Event Handler for Mcast messages. NOT THREAD-SAFE.

This object acts as an Observer to Event Channel. It subscribes to multicast groups that carry events matching the EC's subscriptions. This object then receives callbacks from the Reactor when data is available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, which reads the data, transforms it into event and pushes to the Event Channel.

Definition at line 52 of file ECG_Mcast_EH.h.


Member Typedef Documentation

typedef ACE_Unbounded_Set<ACE_INET_Addr> TAO_ECG_Mcast_EH::Address_Set [private]
 

Definition at line 155 of file ECG_Mcast_EH.h.

Referenced by add_new_subscriptions(), compute_required_subscriptions(), delete_unwanted_subscriptions(), and update_consumer().

typedef ACE_Array_Base<Subscription> TAO_ECG_Mcast_EH::Subscriptions [private]
 

Definition at line 239 of file ECG_Mcast_EH.h.


Constructor & Destructor Documentation

TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH TAO_ECG_Dgram_Handler recv,
const ACE_TCHAR net_if = 0,
CORBA::ULong  buf_sz = 0
 

Constructor. Messages received by this EH will be forwarded to the recv. net_if can be used to specify NIC where multicast messages are expected. would be used to alter the default buffer size.

See comments for receiver_ data member on why raw pointer is used for the recv argument.

Definition at line 20 of file ECG_Mcast_EH.cpp.

References ACE_ASSERT.

00023   : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024   , subscriptions_ ()
00025   , receiver_ (recv)
00026   , recvbuf_size_ (sz)
00027   , observer_ ()
00028   , auto_observer_disconnect_ ()
00029 {
00030   ACE_ASSERT (this->receiver_);
00031 }

TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH void   )  [virtual]
 

Destructor.

Definition at line 33 of file ECG_Mcast_EH.cpp.

References ACE_OS::free().

00034 {
00035   ACE_OS::free (this->net_if_);
00036 }


Member Function Documentation

void TAO_ECG_Mcast_EH::add_new_subscriptions Address_Set multicast_addresses  )  [private]
 

Parameters:
multicast_addresses List of multicast addresses to which we need to subscribe to in order to be receiving all event types in the current consumer subscriptions.

Definition at line 217 of file ECG_Mcast_EH.cpp.

References ACE_ERROR, ACE_NEW, ACE_NONBLOCK, Address_Set, ACE_Unbounded_Set< T >::begin(), TAO_ECG_Mcast_EH::Subscription::dgram, ACE_Unbounded_Set< T >::end(), ENOTSUP, LM_ERROR, TAO_ECG_Mcast_EH::Subscription::mcast_addr, ACE_Event_Handler::reactor(), recvbuf_size_, ACE_Reactor::register_handler(), ACE_Array_Base< T >::size(), and subscriptions_.

00218 {
00219   typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00220   for (Address_Iterator k = multicast_addresses.begin ();
00221        k != multicast_addresses.end ();
00222        ++k)
00223     {
00224       Subscription new_subscription;
00225       new_subscription.mcast_addr = *k;
00226       ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00227 
00228       size_t subscriptions_size = this->subscriptions_.size ();
00229       this->subscriptions_.size (subscriptions_size + 1);
00230       this->subscriptions_[subscriptions_size] = new_subscription;
00231 
00232       ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00233       socket->subscribe (new_subscription.mcast_addr, 1, this->net_if_);
00234       if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00235         ACE_ERROR ((LM_ERROR,
00236                     "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00237                     errno ));
00238       }
00239 
00240       if (this->recvbuf_size_ != 0
00241           && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00242                                                      SO_RCVBUF,
00243                                                      (void *) &this->recvbuf_size_,
00244                                                      sizeof (this->recvbuf_size_)) == -1)
00245           && errno != ENOTSUP )
00246         {
00247           ACE_ERROR ((LM_ERROR,
00248                       "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00249                       errno,
00250                       this->recvbuf_size_));
00251         }
00252       (void) this->reactor ()->register_handler (
00253                                          socket->get_handle (),
00254                                          this,
00255                                          ACE_Event_Handler::READ_MASK);
00256     }
00257 }

void TAO_ECG_Mcast_EH::compute_required_subscriptions const RtecEventChannelAdmin::ConsumerQOS sub,
Address_Set multicast_addresses
throw (CORBA::SystemException) [private]
 

Parameters:
sub The list of event types that our event channel consumers are interested in.
multicast_addresses This method populates this list with multicast addresses that we need to be subscribed to in order to receive event types specified in /a sub.
Exceptions:
CORBA::SystemException This method needs to perform several CORBA invocations, and it propagates any exceptions back to the caller.

Definition at line 156 of file ECG_Mcast_EH.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_UNDEFINED, Address_Set, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, and RtecEventComm::EventHeader::type.

00161 {
00162   CORBA::ULong count = sub.dependencies.length ();
00163   for (CORBA::ULong i = 0; i != count; ++i)
00164     {
00165       const RtecEventComm::EventHeader& header =
00166                            sub.dependencies[i].event.header;
00167       if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00168         {
00169           continue;
00170         }
00171       RtecUDPAdmin::UDP_Addr addr;
00172 
00173       this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER);
00174       ACE_CHECK;
00175 
00176       ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
00177       // Ignore errors, if the element is in the set we simply ignore
00178       // the problem...
00179       (void) multicast_addresses.insert (inet_addr);
00180     }
00181 }

int TAO_ECG_Mcast_EH::delete_unwanted_subscriptions Address_Set multicast_addresses  )  [private]
 

Parameters:
multicast_addresses List of multicast addresses we need to be subscribed to in order receive all event types in the current consumer subscriptions.

Definition at line 184 of file ECG_Mcast_EH.cpp.

References Address_Set, ACE_Unbounded_Set< T >::find(), ACE_Event_Handler::reactor(), ACE_Unbounded_Set< T >::remove(), ACE_Reactor::remove_handler(), ACE_Array_Base< T >::size(), and subscriptions_.

00186 {
00187   for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00188     {
00189       ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00190       if (multicast_addresses.find (multicast_group))
00191         {
00192           // Remove from the list of subscriptions to be added,
00193           // because we already subscribe to it...
00194           (void) multicast_addresses.remove (multicast_group);
00195           continue;
00196         }
00197 
00198       // This subscription is no longer needed - remove from reactor,
00199       // close and delete the socket.
00200       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00201       (void) this->reactor ()->remove_handler (socket->get_handle (),
00202                                                ACE_Event_Handler::READ_MASK);
00203       (void) socket->close();
00204       delete socket;
00205       // Move the deleted subscription out of the <subscriptions_>
00206       // array by moving the last subscription in array into its place.
00207       this->subscriptions_[i] =
00208         this->subscriptions_[this->subscriptions_.size () - 1];
00209       this->subscriptions_.size (this->subscriptions_.size () - 1);
00210       --i;
00211     }
00212 
00213     return 0;
00214 }

int TAO_ECG_Mcast_EH::handle_input ACE_HANDLE  fd  )  [virtual]
 

Reactor callback. Notify receiver_ that a dgram corresponding to fd is ready for reading.

Reimplemented from ACE_Event_Handler.

Definition at line 123 of file ECG_Mcast_EH.cpp.

References TAO_ECG_Dgram_Handler::handle_input(), ACE_Array_Base< T >::size(), and subscriptions_.

00124 {
00125   size_t subscriptions_size = this->subscriptions_.size ();
00126   for (size_t i = 0; i != subscriptions_size; ++i)
00127     {
00128       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00129       if (socket->get_handle () == fd)
00130         {
00131           return this->receiver_->handle_input (*socket);
00132         }
00133     }
00134   return -1;
00135 }

void TAO_ECG_Mcast_EH::open RtecEventChannelAdmin::EventChannel_ptr  ec  ) 
 

Register for changes in the EC subscription list. When the subscription list becomes non-empty we join the proper multicast groups (using Dgram_Handler to translate between event types and mcast groups) and the class registers itself with the reactor.

To insure proper resource clean up, if open () is successful, the user MUST call shutdown () when handler is no longer needed (and its reactor still exists).

Definition at line 39 of file ECG_Mcast_EH.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_ERROR, ACE_NEW, ACE_THROW, activate(), auto_observer_disconnect_, TAO_EC_Servant_Var< Observer >::in(), CORBA::is_nil(), LM_ERROR, RtecEventChannelAdmin::Observer_Handle, and TAO_EC_Auto_Command< Observer_Disconnect_Command >::set_command().

Referenced by TAO_ECG_Mcast_Gateway::init_handler().

00041 {
00042   if (!this->receiver_)
00043     {
00044       // We are shut down.
00045       ACE_THROW (CORBA::INTERNAL());
00046     }
00047 
00048   if (CORBA::is_nil (ec))
00049     {
00050       ACE_ERROR ((LM_ERROR,  "TAO_ECG_Mcast_EH::open(): "
00051                              "nil ec argument"));
00052       ACE_THROW (CORBA::INTERNAL ());
00053     }
00054 
00055   // Create and activate Event Channel Observer.
00056   ACE_NEW (this->observer_,
00057            Observer (this));
00058 
00059   if (!this->observer_.in ())
00060     {
00061       ACE_THROW (CORBA::NO_MEMORY ());
00062     }
00063 
00064   TAO_EC_Object_Deactivator observer_deactivator;
00065   RtecEventChannelAdmin::Observer_var observer_ref;
00066   PortableServer::POA_var poa =
00067     this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00068   ACE_CHECK;
00069 
00070   activate (observer_ref,
00071             poa.in (),
00072             this->observer_.in (),
00073             observer_deactivator
00074             ACE_ENV_ARG_PARAMETER);
00075   ACE_CHECK;
00076 
00077   RtecEventChannelAdmin::Observer_Handle handle =
00078     ec->append_observer (observer_ref.in ()
00079                          ACE_ENV_ARG_PARAMETER);
00080   ACE_CHECK;
00081 
00082   this->observer_->set_deactivator (observer_deactivator);
00083   this->auto_observer_disconnect_.set_command
00084     (Observer_Disconnect_Command (handle, ec));
00085 }

int TAO_ECG_Mcast_EH::shutdown void   )  [virtual]
 

TAO_ECG_Handler_Shutdown method.

Remove ourselves from the event channel, unsubscribe from the multicast groups, close the sockets and deregister from the reactor.

Implements TAO_ECG_Handler_Shutdown.

Definition at line 88 of file ECG_Mcast_EH.cpp.

References auto_observer_disconnect_, TAO_EC_Auto_Command< Observer_Disconnect_Command >::execute(), TAO_EC_Servant_Var< Observer >::in(), ACE_Event_Handler::reactor(), ACE_Reactor::remove_handler(), ACE_Array_Base< T >::size(), and subscriptions_.

00089 {
00090   // Already shut down.
00091   if (!this->receiver_)
00092     return -1;
00093 
00094   // Disconnect Observer from EC.
00095   this->auto_observer_disconnect_.execute ();
00096 
00097   // Shutdown the observer.
00098   if (this->observer_.in ())
00099     {
00100       this->observer_->shutdown ();
00101       this->observer_ = 0;
00102     }
00103 
00104   // Indicates that we are in a shutdown state.
00105   this->receiver_ = 0;
00106 
00107   // Deregister from reactor, close and clean up sockets.
00108   size_t subscriptions_size = this->subscriptions_.size ();
00109   for (size_t i = 0; i != subscriptions_size; ++i)
00110     {
00111       (void) this->reactor ()->remove_handler (
00112                                this->subscriptions_[i].dgram->get_handle (),
00113                                ACE_Event_Handler::READ_MASK);
00114       (void) this->subscriptions_[i].dgram->close();
00115       delete this->subscriptions_[i].dgram;
00116     }
00117   this->subscriptions_.size (0);
00118 
00119   return 0;
00120 }

void TAO_ECG_Mcast_EH::update_consumer const RtecEventChannelAdmin::ConsumerQOS sub  )  throw (CORBA::SystemException) [private]
 

The Observer method. Subscribe/unsubscribe to multicast groups according to changes in consumer subscriptions.

Definition at line 138 of file ECG_Mcast_EH.cpp.

References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and Address_Set.

00142 {
00143   Address_Set multicast_addresses;
00144 
00145   this->compute_required_subscriptions (sub,
00146                                         multicast_addresses
00147                                         ACE_ENV_ARG_PARAMETER);
00148   ACE_CHECK;
00149 
00150   this->delete_unwanted_subscriptions (multicast_addresses);
00151 
00152   this->add_new_subscriptions (multicast_addresses);
00153 }


Friends And Related Function Documentation

friend class Observer [friend]
 

Make update_consumer () accessible to Observer.

Definition at line 146 of file ECG_Mcast_EH.h.


Member Data Documentation

TAO_EC_Auto_Command<Observer_Disconnect_Command> TAO_ECG_Mcast_EH::auto_observer_disconnect_ [private]
 

Manages connection of our observer to the Event Channel. ORDER DEPENDENCY: this member should be declared AFTER .

Definition at line 283 of file ECG_Mcast_EH.h.

Referenced by open(), and shutdown().

ACE_TCHAR* TAO_ECG_Mcast_EH::net_if_ [private]
 

The NIC used to subscribe for multicast traffic.

Definition at line 233 of file ECG_Mcast_EH.h.

TAO_EC_Servant_Var<Observer> TAO_ECG_Mcast_EH::observer_ [private]
 

Event Channel Observer. Detects changes in EC consumer subscriptions. ORDER DEPENDENCY: this member should be declared before .

Definition at line 279 of file ECG_Mcast_EH.h.

TAO_ECG_Dgram_Handler* TAO_ECG_Mcast_EH::receiver_ [private]
 

We callback to this object when a message arrives.

Definition at line 271 of file ECG_Mcast_EH.h.

CORBA::ULong TAO_ECG_Mcast_EH::recvbuf_size_ [private]
 

SOCKbuf size.

Definition at line 274 of file ECG_Mcast_EH.h.

Referenced by add_new_subscriptions().

Subscriptions TAO_ECG_Mcast_EH::subscriptions_ [private]
 

List of multicast addresses we subscribe to and dgrams we use.

Definition at line 259 of file ECG_Mcast_EH.h.

Referenced by add_new_subscriptions(), delete_unwanted_subscriptions(), handle_input(), and shutdown().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:16:09 2006 for TAO_RTEvent by doxygen 1.3.6