ECG_Mcast_EH.cpp

Go to the documentation of this file.
00001 // ECG_Mcast_EH.cpp,v 1.34 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00004 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00005 #include "orbsvcs/Event_Service_Constants.h"
00006 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00007 #include "ace/Unbounded_Set.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/Reactor.h"
00010 #include "ace/os_include/os_fcntl.h"
00011 
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_Mcast_EH.i"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 ACE_RCSID(Event, ECG_Mcast_EH, "ECG_Mcast_EH.cpp,v 1.34 2006/03/14 06:14:25 jtc Exp")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00021                                     const ACE_TCHAR *net_if,
00022                                     CORBA::ULong sz)
00023   : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024   , subscriptions_ ()
00025   , receiver_ (recv)
00026   , recvbuf_size_ (sz)
00027   , observer_ ()
00028   , auto_observer_disconnect_ ()
00029 {
00030   ACE_ASSERT (this->receiver_);
00031 }
00032 
00033 TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
00034 {
00035   ACE_OS::free (this->net_if_);
00036 }
00037 
00038 void
00039 TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
00040                         ACE_ENV_ARG_DECL)
00041 {
00042   if (!this->receiver_)
00043     {
00044       // We are shut down.
00045       ACE_THROW (CORBA::INTERNAL());
00046     }
00047 
00048   if (CORBA::is_nil (ec))
00049     {
00050       ACE_ERROR ((LM_ERROR,  "TAO_ECG_Mcast_EH::open(): "
00051                              "nil ec argument"));
00052       ACE_THROW (CORBA::INTERNAL ());
00053     }
00054 
00055   // Create and activate Event Channel Observer.
00056   ACE_NEW (this->observer_,
00057            Observer (this));
00058 
00059   if (!this->observer_.in ())
00060     {
00061       ACE_THROW (CORBA::NO_MEMORY ());
00062     }
00063 
00064   TAO_EC_Object_Deactivator observer_deactivator;
00065   RtecEventChannelAdmin::Observer_var observer_ref;
00066   PortableServer::POA_var poa =
00067     this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00068   ACE_CHECK;
00069 
00070   activate (observer_ref,
00071             poa.in (),
00072             this->observer_.in (),
00073             observer_deactivator
00074             ACE_ENV_ARG_PARAMETER);
00075   ACE_CHECK;
00076 
00077   RtecEventChannelAdmin::Observer_Handle handle =
00078     ec->append_observer (observer_ref.in ()
00079                          ACE_ENV_ARG_PARAMETER);
00080   ACE_CHECK;
00081 
00082   this->observer_->set_deactivator (observer_deactivator);
00083   this->auto_observer_disconnect_.set_command
00084     (Observer_Disconnect_Command (handle, ec));
00085 }
00086 
00087 int
00088 TAO_ECG_Mcast_EH::shutdown (void)
00089 {
00090   // Already shut down.
00091   if (!this->receiver_)
00092     return -1;
00093 
00094   // Disconnect Observer from EC.
00095   this->auto_observer_disconnect_.execute ();
00096 
00097   // Shutdown the observer.
00098   if (this->observer_.in ())
00099     {
00100       this->observer_->shutdown ();
00101       this->observer_ = 0;
00102     }
00103 
00104   // Indicates that we are in a shutdown state.
00105   this->receiver_ = 0;
00106 
00107   // Deregister from reactor, close and clean up sockets.
00108   size_t subscriptions_size = this->subscriptions_.size ();
00109   for (size_t i = 0; i != subscriptions_size; ++i)
00110     {
00111       (void) this->reactor ()->remove_handler (
00112                                this->subscriptions_[i].dgram->get_handle (),
00113                                ACE_Event_Handler::READ_MASK);
00114       (void) this->subscriptions_[i].dgram->close();
00115       delete this->subscriptions_[i].dgram;
00116     }
00117   this->subscriptions_.size (0);
00118 
00119   return 0;
00120 }
00121 
00122 int
00123 TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
00124 {
00125   size_t subscriptions_size = this->subscriptions_.size ();
00126   for (size_t i = 0; i != subscriptions_size; ++i)
00127     {
00128       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00129       if (socket->get_handle () == fd)
00130         {
00131           return this->receiver_->handle_input (*socket);
00132         }
00133     }
00134   return -1;
00135 }
00136 
00137 void
00138 TAO_ECG_Mcast_EH::update_consumer (
00139          const RtecEventChannelAdmin::ConsumerQOS& sub
00140     ACE_ENV_ARG_DECL)
00141       ACE_THROW_SPEC ((CORBA::SystemException))
00142 {
00143   Address_Set multicast_addresses;
00144 
00145   this->compute_required_subscriptions (sub,
00146                                         multicast_addresses
00147                                         ACE_ENV_ARG_PARAMETER);
00148   ACE_CHECK;
00149 
00150   this->delete_unwanted_subscriptions (multicast_addresses);
00151 
00152   this->add_new_subscriptions (multicast_addresses);
00153 }
00154 
00155 void
00156 TAO_ECG_Mcast_EH::compute_required_subscriptions (
00157     const RtecEventChannelAdmin::ConsumerQOS& sub,
00158     Address_Set& multicast_addresses
00159     ACE_ENV_ARG_DECL)
00160   ACE_THROW_SPEC ((CORBA::SystemException))
00161 {
00162   CORBA::ULong count = sub.dependencies.length ();
00163   for (CORBA::ULong i = 0; i != count; ++i)
00164     {
00165       const RtecEventComm::EventHeader& header =
00166                            sub.dependencies[i].event.header;
00167       if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00168         {
00169           continue;
00170         }
00171       RtecUDPAdmin::UDP_Addr addr;
00172 
00173       this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER);
00174       ACE_CHECK;
00175 
00176       ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
00177       // Ignore errors, if the element is in the set we simply ignore
00178       // the problem...
00179       (void) multicast_addresses.insert (inet_addr);
00180     }
00181 }
00182 
00183 int
00184 TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
00185     Address_Set& multicast_addresses)
00186 {
00187   for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00188     {
00189       ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00190       if (multicast_addresses.find (multicast_group))
00191         {
00192           // Remove from the list of subscriptions to be added,
00193           // because we already subscribe to it...
00194           (void) multicast_addresses.remove (multicast_group);
00195           continue;
00196         }
00197 
00198       // This subscription is no longer needed - remove from reactor,
00199       // close and delete the socket.
00200       ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00201       (void) this->reactor ()->remove_handler (socket->get_handle (),
00202                                                ACE_Event_Handler::READ_MASK);
00203       (void) socket->close();
00204       delete socket;
00205       // Move the deleted subscription out of the <subscriptions_>
00206       // array by moving the last subscription in array into its place.
00207       this->subscriptions_[i] =
00208         this->subscriptions_[this->subscriptions_.size () - 1];
00209       this->subscriptions_.size (this->subscriptions_.size () - 1);
00210       --i;
00211     }
00212 
00213     return 0;
00214 }
00215 
00216 void
00217 TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
00218 {
00219   typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00220   for (Address_Iterator k = multicast_addresses.begin ();
00221        k != multicast_addresses.end ();
00222        ++k)
00223     {
00224       Subscription new_subscription;
00225       new_subscription.mcast_addr = *k;
00226       ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00227 
00228       size_t subscriptions_size = this->subscriptions_.size ();
00229       this->subscriptions_.size (subscriptions_size + 1);
00230       this->subscriptions_[subscriptions_size] = new_subscription;
00231 
00232       ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00233       socket->subscribe (new_subscription.mcast_addr, 1, this->net_if_);
00234       if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00235         ACE_ERROR ((LM_ERROR,
00236                     "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00237                     errno ));
00238       }
00239 
00240       if (this->recvbuf_size_ != 0
00241           && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00242                                                      SO_RCVBUF,
00243                                                      (void *) &this->recvbuf_size_,
00244                                                      sizeof (this->recvbuf_size_)) == -1)
00245           && errno != ENOTSUP )
00246         {
00247           ACE_ERROR ((LM_ERROR,
00248                       "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00249                       errno,
00250                       this->recvbuf_size_));
00251         }
00252       (void) this->reactor ()->register_handler (
00253                                          socket->get_handle (),
00254                                          this,
00255                                          ACE_Event_Handler::READ_MASK);
00256     }
00257 }
00258 
00259 // ****************************************************************
00260 
00261 TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
00262   :  eh_ (eh)
00263 {
00264 }
00265 
00266 void
00267 TAO_ECG_Mcast_EH::Observer::update_consumer (
00268     const RtecEventChannelAdmin::ConsumerQOS& sub
00269     ACE_ENV_ARG_DECL)
00270       ACE_THROW_SPEC ((CORBA::SystemException))
00271 {
00272   if (this->eh_)
00273     this->eh_->update_consumer (sub ACE_ENV_ARG_PARAMETER);
00274 }
00275 
00276 void
00277 TAO_ECG_Mcast_EH::Observer::update_supplier (
00278     const RtecEventChannelAdmin::SupplierQOS&
00279     ACE_ENV_ARG_DECL_NOT_USED)
00280       ACE_THROW_SPEC ((CORBA::SystemException))
00281 {
00282 }
00283 
00284 void
00285 TAO_ECG_Mcast_EH::Observer::shutdown (void)
00286 {
00287   this->eh_ = 0;
00288   this->deactivator_.deactivate ();
00289 }
00290 
00291 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:10 2006 for TAO_RTEvent by doxygen 1.3.6