00001
00002
00003 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00004 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00005 #include "orbsvcs/Event_Service_Constants.h"
00006 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00007 #include "ace/Unbounded_Set.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/Reactor.h"
00010 #include "ace/os_include/os_fcntl.h"
00011
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_Mcast_EH.inl"
00014 #endif
00015
00016 ACE_RCSID(Event, ECG_Mcast_EH, "$Id: ECG_Mcast_EH.cpp 78550 2007-06-05 19:01:50Z mesnier_p $")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00021 const ACE_TCHAR *net_if,
00022 CORBA::ULong sz)
00023 : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024 , subscriptions_ ()
00025 , receiver_ (recv)
00026 , recvbuf_size_ (sz)
00027 , observer_ ()
00028 , auto_observer_disconnect_ ()
00029 {
00030 ACE_ASSERT (this->receiver_);
00031 }
00032
00033 TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
00034 {
00035 ACE_OS::free (this->net_if_);
00036 }
00037
00038 void
00039 TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec)
00040 {
00041 if (!this->receiver_)
00042 {
00043
00044 throw CORBA::INTERNAL();
00045 }
00046
00047 if (CORBA::is_nil (ec))
00048 {
00049 ACE_ERROR ((LM_ERROR, "TAO_ECG_Mcast_EH::open(): "
00050 "nil ec argument"));
00051 throw CORBA::INTERNAL ();
00052 }
00053
00054
00055 ACE_NEW (this->observer_,
00056 Observer (this));
00057
00058 if (!this->observer_.in ())
00059 {
00060 throw CORBA::NO_MEMORY ();
00061 }
00062
00063 TAO_EC_Object_Deactivator observer_deactivator;
00064 RtecEventChannelAdmin::Observer_var observer_ref;
00065 PortableServer::POA_var poa =
00066 this->observer_->_default_POA ();
00067
00068 activate (observer_ref,
00069 poa.in (),
00070 this->observer_.in (),
00071 observer_deactivator);
00072
00073 RtecEventChannelAdmin::Observer_Handle handle =
00074 ec->append_observer (observer_ref.in ());
00075
00076 this->observer_->set_deactivator (observer_deactivator);
00077 this->auto_observer_disconnect_.set_command
00078 (Observer_Disconnect_Command (handle, ec));
00079 }
00080
00081 int
00082 TAO_ECG_Mcast_EH::shutdown (void)
00083 {
00084
00085 if (!this->receiver_)
00086 return -1;
00087
00088
00089 this->auto_observer_disconnect_.execute ();
00090
00091
00092 if (this->observer_.in ())
00093 {
00094 this->observer_->shutdown ();
00095 this->observer_ = 0;
00096 }
00097
00098
00099 this->receiver_ = 0;
00100
00101
00102 size_t const subscriptions_size = this->subscriptions_.size ();
00103 for (size_t i = 0; i != subscriptions_size; ++i)
00104 {
00105 (void) this->reactor ()->remove_handler (
00106 this->subscriptions_[i].dgram->get_handle (),
00107 ACE_Event_Handler::READ_MASK);
00108 (void) this->subscriptions_[i].dgram->close();
00109 delete this->subscriptions_[i].dgram;
00110 }
00111 this->subscriptions_.size (0);
00112
00113 return 0;
00114 }
00115
00116 int
00117 TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
00118 {
00119 size_t const subscriptions_size = this->subscriptions_.size ();
00120 for (size_t i = 0; i != subscriptions_size; ++i)
00121 {
00122 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00123 if (socket->get_handle () == fd)
00124 {
00125 return this->receiver_->handle_input (*socket);
00126 }
00127 }
00128 return -1;
00129 }
00130
00131 void
00132 TAO_ECG_Mcast_EH::update_consumer (
00133 const RtecEventChannelAdmin::ConsumerQOS& sub)
00134 {
00135 Address_Set multicast_addresses;
00136
00137 this->compute_required_subscriptions (sub,
00138 multicast_addresses);
00139
00140 this->delete_unwanted_subscriptions (multicast_addresses);
00141
00142 this->add_new_subscriptions (multicast_addresses);
00143 }
00144
00145 void
00146 TAO_ECG_Mcast_EH::compute_required_subscriptions (
00147 const RtecEventChannelAdmin::ConsumerQOS& sub,
00148 Address_Set& multicast_addresses)
00149 {
00150 CORBA::ULong count = sub.dependencies.length ();
00151 for (CORBA::ULong i = 0; i != count; ++i)
00152 {
00153 const RtecEventComm::EventHeader& header =
00154 sub.dependencies[i].event.header;
00155 if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00156 {
00157 continue;
00158 }
00159
00160 ACE_INET_Addr inet_addr;
00161 try
00162 {
00163
00164 RtecUDPAdmin::UDP_Address_var udp_addr;
00165
00166 this->receiver_->get_address (header, udp_addr.out());
00167 switch (udp_addr->_d())
00168 {
00169 case RtecUDPAdmin::Rtec_inet:
00170 inet_addr.set(udp_addr->v4_addr().port,
00171 udp_addr->v4_addr().ipaddr);
00172 break;
00173 case RtecUDPAdmin::Rtec_inet6:
00174 #if defined (ACE_HAS_IPV6)
00175 inet_addr.set_type(PF_INET6);
00176 #endif
00177 inet_addr.set_address(udp_addr->v6_addr().ipaddr,16,0);
00178 inet_addr.set_port_number(udp_addr->v6_addr().port);
00179 break;
00180 }
00181 }
00182 catch (const ::CORBA::BAD_OPERATION &)
00183 {
00184
00185
00186 RtecUDPAdmin::UDP_Addr udp_addr;
00187 this->receiver_->get_addr (header, udp_addr);
00188 inet_addr.set (udp_addr.port, udp_addr.ipaddr);
00189 }
00190
00191
00192
00193 (void) multicast_addresses.insert (inet_addr);
00194 }
00195 }
00196
00197 int
00198 TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
00199 Address_Set& multicast_addresses)
00200 {
00201 for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00202 {
00203 ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00204 if (multicast_addresses.find (multicast_group))
00205 {
00206
00207
00208 (void) multicast_addresses.remove (multicast_group);
00209 continue;
00210 }
00211
00212
00213
00214 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00215 (void) this->reactor ()->remove_handler (socket->get_handle (),
00216 ACE_Event_Handler::READ_MASK);
00217 (void) socket->close();
00218 delete socket;
00219
00220
00221 this->subscriptions_[i] =
00222 this->subscriptions_[this->subscriptions_.size () - 1];
00223 this->subscriptions_.size (this->subscriptions_.size () - 1);
00224 --i;
00225 }
00226
00227 return 0;
00228 }
00229
00230 void
00231 TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
00232 {
00233 typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00234 for (Address_Iterator k = multicast_addresses.begin ();
00235 k != multicast_addresses.end ();
00236 ++k)
00237 {
00238 Subscription new_subscription;
00239 new_subscription.mcast_addr = *k;
00240 ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00241
00242 size_t const subscriptions_size = this->subscriptions_.size ();
00243 this->subscriptions_.size (subscriptions_size + 1);
00244 this->subscriptions_[subscriptions_size] = new_subscription;
00245
00246 ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00247
00248 if (socket->open (new_subscription.mcast_addr, this->net_if_, 1) == -1) {
00249 ACE_ERROR ((LM_ERROR,
00250 "Error: %d - Unable to open multicast socket\n",
00251 errno ));
00252 }
00253
00254 if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00255 ACE_ERROR ((LM_ERROR,
00256 "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00257 errno ));
00258 }
00259
00260 if (socket->join (new_subscription.mcast_addr, 1, this->net_if_) == -1) {
00261 ACE_ERROR ((LM_ERROR,
00262 "Error: %d - Unable to join multicast group\n",
00263 errno ));
00264 }
00265
00266 if (this->recvbuf_size_ != 0
00267 && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00268 SO_RCVBUF,
00269 (void *) &this->recvbuf_size_,
00270 sizeof (this->recvbuf_size_)) == -1)
00271 && errno != ENOTSUP )
00272 {
00273 ACE_ERROR ((LM_ERROR,
00274 "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00275 errno,
00276 this->recvbuf_size_));
00277 }
00278 (void) this->reactor ()->register_handler (
00279 socket->get_handle (),
00280 this,
00281 ACE_Event_Handler::READ_MASK);
00282 }
00283 }
00284
00285
00286
00287 TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
00288 : eh_ (eh)
00289 {
00290 }
00291
00292 void
00293 TAO_ECG_Mcast_EH::Observer::update_consumer (
00294 const RtecEventChannelAdmin::ConsumerQOS& sub)
00295 {
00296 if (this->eh_)
00297 this->eh_->update_consumer (sub);
00298 }
00299
00300 void
00301 TAO_ECG_Mcast_EH::Observer::update_supplier (
00302 const RtecEventChannelAdmin::SupplierQOS&)
00303 {
00304 }
00305
00306 void
00307 TAO_ECG_Mcast_EH::Observer::shutdown (void)
00308 {
00309 this->eh_ = 0;
00310 this->deactivator_.deactivate ();
00311 }
00312
00313 TAO_END_VERSIONED_NAMESPACE_DECL