ECG_Mcast_EH.cpp

Go to the documentation of this file.
00001 // $Id: ECG_Mcast_EH.cpp 78550 2007-06-05 19:01:50Z mesnier_p $
00002 
00003 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00004 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00005 #include "orbsvcs/Event_Service_Constants.h"
00006 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00007 #include "ace/Unbounded_Set.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/Reactor.h"
00010 #include "ace/os_include/os_fcntl.h"
00011 
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_Mcast_EH.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID(Event, ECG_Mcast_EH, "$Id: ECG_Mcast_EH.cpp 78550 2007-06-05 19:01:50Z mesnier_p $")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00021                                     const ACE_TCHAR *net_if,
00022                                     CORBA::ULong sz)
00023   : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024   , subscriptions_ ()
00025   , receiver_ (recv)
00026   , recvbuf_size_ (sz)
00027   , observer_ ()
00028   , auto_observer_disconnect_ ()
00029 {
00030   ACE_ASSERT (this->receiver_);
00031 }
00032 
00033 TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
00034 {
00035   ACE_OS::free (this->net_if_);
00036 }
00037 
00038 void
00039 TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec)
00040 {
00041   if (!this->receiver_)
00042     {
00043       // We are shut down.
00044       throw CORBA::INTERNAL();
00045     }
00046 
00047   if (CORBA::is_nil (ec))
00048     {
00049       ACE_ERROR ((LM_ERROR,  "TAO_ECG_Mcast_EH::open(): "
00050                              "nil ec argument"));
00051       throw CORBA::INTERNAL ();
00052     }
00053 
00054   // Create and activate Event Channel Observer.
00055   ACE_NEW (this->observer_,
00056            Observer (this));
00057 
00058   if (!this->observer_.in ())
00059     {
00060       throw CORBA::NO_MEMORY ();
00061     }
00062 
00063   TAO_EC_Object_Deactivator observer_deactivator;
00064   RtecEventChannelAdmin::Observer_var observer_ref;
00065   PortableServer::POA_var poa =
00066     this->observer_->_default_POA ();
00067 
00068   activate (observer_ref,
00069             poa.in (),
00070             this->observer_.in (),
00071             observer_deactivator);
00072 
00073   RtecEventChannelAdmin::Observer_Handle handle =
00074     ec->append_observer (observer_ref.in ());
00075 
00076   this->observer_->set_deactivator (observer_deactivator);
00077   this->auto_observer_disconnect_.set_command
00078     (Observer_Disconnect_Command (handle, ec));
00079 }
00080 
00081 int
00082 TAO_ECG_Mcast_EH::shutdown (void)
00083 {
00084   // Already shut down.
00085   if (!this->receiver_)
00086     return -1;
00087 
00088   // Disconnect Observer from EC.
00089   this->auto_observer_disconnect_.execute ();
00090 
00091   // Shutdown the observer.
00092   if (this->observer_.in ())
00093     {
00094       this->observer_->shutdown ();
00095       this->observer_ = 0;
00096     }
00097 
00098   // Indicates that we are in a shutdown state.
00099   this->receiver_ = 0;
00100 
00101   // Deregister from reactor, close and clean up sockets.
00102   size_t const subscriptions_size = this->subscriptions_.size ();
00103   for (size_t i = 0; i != subscriptions_size; ++i)
00104     {
00105       (void) this->reactor ()->remove_handler (
00106                                this->subscriptions_[i].dgram->get_handle (),
00107                                ACE_Event_Handler::READ_MASK);
00108       (void) this->subscriptions_[i].dgram->close();
00109       delete this->subscriptions_[i].dgram;
00110     }
00111   this->subscriptions_.size (0);
00112 
00113   return 0;
00114 }
00115 
00116 int
00117 TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
00118 {
00119   size_t const subscriptions_size = this->subscriptions_.size ();
00120   for (size_t i = 0; i != subscriptions_size; ++i)
00121     {
00122       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00123       if (socket->get_handle () == fd)
00124         {
00125           return this->receiver_->handle_input (*socket);
00126         }
00127     }
00128   return -1;
00129 }
00130 
00131 void
00132 TAO_ECG_Mcast_EH::update_consumer (
00133          const RtecEventChannelAdmin::ConsumerQOS& sub)
00134 {
00135   Address_Set multicast_addresses;
00136 
00137   this->compute_required_subscriptions (sub,
00138                                         multicast_addresses);
00139 
00140   this->delete_unwanted_subscriptions (multicast_addresses);
00141 
00142   this->add_new_subscriptions (multicast_addresses);
00143 }
00144 
00145 void
00146 TAO_ECG_Mcast_EH::compute_required_subscriptions (
00147     const RtecEventChannelAdmin::ConsumerQOS& sub,
00148     Address_Set& multicast_addresses)
00149 {
00150   CORBA::ULong count = sub.dependencies.length ();
00151   for (CORBA::ULong i = 0; i != count; ++i)
00152     {
00153       const RtecEventComm::EventHeader& header =
00154                            sub.dependencies[i].event.header;
00155       if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00156         {
00157           continue;
00158         }
00159 
00160       ACE_INET_Addr inet_addr;
00161       try
00162         {
00163           // Grab the right mcast group for this event...
00164           RtecUDPAdmin::UDP_Address_var udp_addr;
00165 
00166           this->receiver_->get_address (header, udp_addr.out());
00167           switch (udp_addr->_d())
00168             {
00169             case RtecUDPAdmin::Rtec_inet:
00170               inet_addr.set(udp_addr->v4_addr().port,
00171                             udp_addr->v4_addr().ipaddr);
00172               break;
00173             case RtecUDPAdmin::Rtec_inet6:
00174 #if defined (ACE_HAS_IPV6)
00175               inet_addr.set_type(PF_INET6);
00176 #endif
00177               inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00178               inet_addr.set_port_number(udp_addr->v6_addr().port);
00179               break;
00180             }
00181         }
00182       catch (const ::CORBA::BAD_OPERATION &)
00183         {
00184           // server only supports IPv4
00185            // Grab the right mcast group for this event...
00186           RtecUDPAdmin::UDP_Addr udp_addr;
00187           this->receiver_->get_addr (header, udp_addr);
00188           inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00189         }
00190 
00191       // Ignore errors, if the element is in the set we simply ignore
00192       // the problem...
00193       (void) multicast_addresses.insert (inet_addr);
00194     }
00195 }
00196 
00197 int
00198 TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
00199     Address_Set& multicast_addresses)
00200 {
00201   for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00202     {
00203       ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00204       if (multicast_addresses.find (multicast_group))
00205         {
00206           // Remove from the list of subscriptions to be added,
00207           // because we already subscribe to it...
00208           (void) multicast_addresses.remove (multicast_group);
00209           continue;
00210         }
00211 
00212       // This subscription is no longer needed - remove from reactor,
00213       // close and delete the socket.
00214       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00215       (void) this->reactor ()->remove_handler (socket->get_handle (),
00216                                                ACE_Event_Handler::READ_MASK);
00217       (void) socket->close();
00218       delete socket;
00219       // Move the deleted subscription out of the <subscriptions_>
00220       // array by moving the last subscription in array into its place.
00221       this->subscriptions_[i] =
00222         this->subscriptions_[this->subscriptions_.size () - 1];
00223       this->subscriptions_.size (this->subscriptions_.size () - 1);
00224       --i;
00225     }
00226 
00227     return 0;
00228 }
00229 
00230 void
00231 TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
00232 {
00233   typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00234   for (Address_Iterator k = multicast_addresses.begin ();
00235        k != multicast_addresses.end ();
00236        ++k)
00237     {
00238       Subscription new_subscription;
00239       new_subscription.mcast_addr = *k;
00240       ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00241 
00242       size_t const subscriptions_size = this->subscriptions_.size ();
00243       this->subscriptions_.size (subscriptions_size + 1);
00244       this->subscriptions_[subscriptions_size] = new_subscription;
00245 
00246       ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00247 
00248       if (socket->open (new_subscription.mcast_addr, this->net_if_, 1) == -1) {
00249         ACE_ERROR ((LM_ERROR,
00250                     "Error: %d - Unable to open multicast socket\n",
00251                     errno ));
00252       }
00253 
00254       if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00255         ACE_ERROR ((LM_ERROR,
00256                     "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00257                     errno ));
00258       }
00259 
00260       if (socket->join (new_subscription.mcast_addr, 1, this->net_if_) == -1) {
00261         ACE_ERROR ((LM_ERROR,
00262                     "Error: %d - Unable to join multicast group\n",
00263                     errno ));
00264       }
00265 
00266       if (this->recvbuf_size_ != 0
00267           && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00268                                                      SO_RCVBUF,
00269                                                      (void *) &this->recvbuf_size_,
00270                                                      sizeof (this->recvbuf_size_)) == -1)
00271           && errno != ENOTSUP )
00272         {
00273           ACE_ERROR ((LM_ERROR,
00274                       "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00275                       errno,
00276                       this->recvbuf_size_));
00277         }
00278       (void) this->reactor ()->register_handler (
00279                                          socket->get_handle (),
00280                                          this,
00281                                          ACE_Event_Handler::READ_MASK);
00282     }
00283 }
00284 
00285 // ****************************************************************
00286 
00287 TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
00288   :  eh_ (eh)
00289 {
00290 }
00291 
00292 void
00293 TAO_ECG_Mcast_EH::Observer::update_consumer (
00294     const RtecEventChannelAdmin::ConsumerQOS& sub)
00295 {
00296   if (this->eh_)
00297     this->eh_->update_consumer (sub);
00298 }
00299 
00300 void
00301 TAO_ECG_Mcast_EH::Observer::update_supplier (
00302     const RtecEventChannelAdmin::SupplierQOS&)
00303 {
00304 }
00305 
00306 void
00307 TAO_ECG_Mcast_EH::Observer::shutdown (void)
00308 {
00309   this->eh_ = 0;
00310   this->deactivator_.deactivate ();
00311 }
00312 
00313 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7