00001
00002
00003 #include "orbsvcs/Event/ECG_Mcast_EH.h"
00004 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00005 #include "orbsvcs/Event_Service_Constants.h"
00006 #include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
00007 #include "ace/Unbounded_Set.h"
00008 #include "ace/OS_NS_string.h"
00009 #include "ace/Reactor.h"
00010 #include "ace/os_include/os_fcntl.h"
00011
00012 #if !defined(__ACE_INLINE__)
00013 #include "orbsvcs/Event/ECG_Mcast_EH.i"
00014 #endif
00015
00016 ACE_RCSID(Event, ECG_Mcast_EH, "ECG_Mcast_EH.cpp,v 1.34 2006/03/14 06:14:25 jtc Exp")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv,
00021 const ACE_TCHAR *net_if,
00022 CORBA::ULong sz)
00023 : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0)
00024 , subscriptions_ ()
00025 , receiver_ (recv)
00026 , recvbuf_size_ (sz)
00027 , observer_ ()
00028 , auto_observer_disconnect_ ()
00029 {
00030 ACE_ASSERT (this->receiver_);
00031 }
00032
00033 TAO_ECG_Mcast_EH::~TAO_ECG_Mcast_EH (void)
00034 {
00035 ACE_OS::free (this->net_if_);
00036 }
00037
00038 void
00039 TAO_ECG_Mcast_EH::open (RtecEventChannelAdmin::EventChannel_ptr ec
00040 ACE_ENV_ARG_DECL)
00041 {
00042 if (!this->receiver_)
00043 {
00044
00045 ACE_THROW (CORBA::INTERNAL());
00046 }
00047
00048 if (CORBA::is_nil (ec))
00049 {
00050 ACE_ERROR ((LM_ERROR, "TAO_ECG_Mcast_EH::open(): "
00051 "nil ec argument"));
00052 ACE_THROW (CORBA::INTERNAL ());
00053 }
00054
00055
00056 ACE_NEW (this->observer_,
00057 Observer (this));
00058
00059 if (!this->observer_.in ())
00060 {
00061 ACE_THROW (CORBA::NO_MEMORY ());
00062 }
00063
00064 TAO_EC_Object_Deactivator observer_deactivator;
00065 RtecEventChannelAdmin::Observer_var observer_ref;
00066 PortableServer::POA_var poa =
00067 this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00068 ACE_CHECK;
00069
00070 activate (observer_ref,
00071 poa.in (),
00072 this->observer_.in (),
00073 observer_deactivator
00074 ACE_ENV_ARG_PARAMETER);
00075 ACE_CHECK;
00076
00077 RtecEventChannelAdmin::Observer_Handle handle =
00078 ec->append_observer (observer_ref.in ()
00079 ACE_ENV_ARG_PARAMETER);
00080 ACE_CHECK;
00081
00082 this->observer_->set_deactivator (observer_deactivator);
00083 this->auto_observer_disconnect_.set_command
00084 (Observer_Disconnect_Command (handle, ec));
00085 }
00086
00087 int
00088 TAO_ECG_Mcast_EH::shutdown (void)
00089 {
00090
00091 if (!this->receiver_)
00092 return -1;
00093
00094
00095 this->auto_observer_disconnect_.execute ();
00096
00097
00098 if (this->observer_.in ())
00099 {
00100 this->observer_->shutdown ();
00101 this->observer_ = 0;
00102 }
00103
00104
00105 this->receiver_ = 0;
00106
00107
00108 size_t subscriptions_size = this->subscriptions_.size ();
00109 for (size_t i = 0; i != subscriptions_size; ++i)
00110 {
00111 (void) this->reactor ()->remove_handler (
00112 this->subscriptions_[i].dgram->get_handle (),
00113 ACE_Event_Handler::READ_MASK);
00114 (void) this->subscriptions_[i].dgram->close();
00115 delete this->subscriptions_[i].dgram;
00116 }
00117 this->subscriptions_.size (0);
00118
00119 return 0;
00120 }
00121
00122 int
00123 TAO_ECG_Mcast_EH::handle_input (ACE_HANDLE fd)
00124 {
00125 size_t subscriptions_size = this->subscriptions_.size ();
00126 for (size_t i = 0; i != subscriptions_size; ++i)
00127 {
00128 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00129 if (socket->get_handle () == fd)
00130 {
00131 return this->receiver_->handle_input (*socket);
00132 }
00133 }
00134 return -1;
00135 }
00136
00137 void
00138 TAO_ECG_Mcast_EH::update_consumer (
00139 const RtecEventChannelAdmin::ConsumerQOS& sub
00140 ACE_ENV_ARG_DECL)
00141 ACE_THROW_SPEC ((CORBA::SystemException))
00142 {
00143 Address_Set multicast_addresses;
00144
00145 this->compute_required_subscriptions (sub,
00146 multicast_addresses
00147 ACE_ENV_ARG_PARAMETER);
00148 ACE_CHECK;
00149
00150 this->delete_unwanted_subscriptions (multicast_addresses);
00151
00152 this->add_new_subscriptions (multicast_addresses);
00153 }
00154
00155 void
00156 TAO_ECG_Mcast_EH::compute_required_subscriptions (
00157 const RtecEventChannelAdmin::ConsumerQOS& sub,
00158 Address_Set& multicast_addresses
00159 ACE_ENV_ARG_DECL)
00160 ACE_THROW_SPEC ((CORBA::SystemException))
00161 {
00162 CORBA::ULong count = sub.dependencies.length ();
00163 for (CORBA::ULong i = 0; i != count; ++i)
00164 {
00165 const RtecEventComm::EventHeader& header =
00166 sub.dependencies[i].event.header;
00167 if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED)
00168 {
00169 continue;
00170 }
00171 RtecUDPAdmin::UDP_Addr addr;
00172
00173 this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER);
00174 ACE_CHECK;
00175
00176 ACE_INET_Addr inet_addr (addr.port, addr.ipaddr);
00177
00178
00179 (void) multicast_addresses.insert (inet_addr);
00180 }
00181 }
00182
00183 int
00184 TAO_ECG_Mcast_EH::delete_unwanted_subscriptions (
00185 Address_Set& multicast_addresses)
00186 {
00187 for (size_t i = 0; i < this->subscriptions_.size (); ++i)
00188 {
00189 ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr;
00190 if (multicast_addresses.find (multicast_group))
00191 {
00192
00193
00194 (void) multicast_addresses.remove (multicast_group);
00195 continue;
00196 }
00197
00198
00199
00200 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram;
00201 (void) this->reactor ()->remove_handler (socket->get_handle (),
00202 ACE_Event_Handler::READ_MASK);
00203 (void) socket->close();
00204 delete socket;
00205
00206
00207 this->subscriptions_[i] =
00208 this->subscriptions_[this->subscriptions_.size () - 1];
00209 this->subscriptions_.size (this->subscriptions_.size () - 1);
00210 --i;
00211 }
00212
00213 return 0;
00214 }
00215
00216 void
00217 TAO_ECG_Mcast_EH::add_new_subscriptions (Address_Set& multicast_addresses)
00218 {
00219 typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator;
00220 for (Address_Iterator k = multicast_addresses.begin ();
00221 k != multicast_addresses.end ();
00222 ++k)
00223 {
00224 Subscription new_subscription;
00225 new_subscription.mcast_addr = *k;
00226 ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast);
00227
00228 size_t subscriptions_size = this->subscriptions_.size ();
00229 this->subscriptions_.size (subscriptions_size + 1);
00230 this->subscriptions_[subscriptions_size] = new_subscription;
00231
00232 ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram;
00233 socket->subscribe (new_subscription.mcast_addr, 1, this->net_if_);
00234 if ( socket->enable (ACE_NONBLOCK) != 0 ) {
00235 ACE_ERROR ((LM_ERROR,
00236 "Error: %d - Unable to enable nonblocking on mcast_eh\n",
00237 errno ));
00238 }
00239
00240 if (this->recvbuf_size_ != 0
00241 && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET,
00242 SO_RCVBUF,
00243 (void *) &this->recvbuf_size_,
00244 sizeof (this->recvbuf_size_)) == -1)
00245 && errno != ENOTSUP )
00246 {
00247 ACE_ERROR ((LM_ERROR,
00248 "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n",
00249 errno,
00250 this->recvbuf_size_));
00251 }
00252 (void) this->reactor ()->register_handler (
00253 socket->get_handle (),
00254 this,
00255 ACE_Event_Handler::READ_MASK);
00256 }
00257 }
00258
00259
00260
00261 TAO_ECG_Mcast_EH::Observer::Observer (TAO_ECG_Mcast_EH* eh)
00262 : eh_ (eh)
00263 {
00264 }
00265
00266 void
00267 TAO_ECG_Mcast_EH::Observer::update_consumer (
00268 const RtecEventChannelAdmin::ConsumerQOS& sub
00269 ACE_ENV_ARG_DECL)
00270 ACE_THROW_SPEC ((CORBA::SystemException))
00271 {
00272 if (this->eh_)
00273 this->eh_->update_consumer (sub ACE_ENV_ARG_PARAMETER);
00274 }
00275
00276 void
00277 TAO_ECG_Mcast_EH::Observer::update_supplier (
00278 const RtecEventChannelAdmin::SupplierQOS&
00279 ACE_ENV_ARG_DECL_NOT_USED)
00280 ACE_THROW_SPEC ((CORBA::SystemException))
00281 {
00282 }
00283
00284 void
00285 TAO_ECG_Mcast_EH::Observer::shutdown (void)
00286 {
00287 this->eh_ = 0;
00288 this->deactivator_.deactivate ();
00289 }
00290
00291 TAO_END_VERSIONED_NAMESPACE_DECL