#include <ECG_Mcast_EH.h>
Inheritance diagram for TAO_ECG_Mcast_EH:
Public Member Functions | |
virtual int | handle_input (ACE_HANDLE fd) |
TAO_ECG_Mcast_EH (TAO_ECG_Dgram_Handler *recv, const ACE_TCHAR *net_if=0, CORBA::ULong buf_sz=0) | |
virtual | ~TAO_ECG_Mcast_EH (void) |
Destructor. | |
void | open (RtecEventChannelAdmin::EventChannel_ptr ec) |
virtual int | shutdown (void) |
TAO_ECG_Handler_Shutdown method. | |
Private Types | |
typedef ACE_Unbounded_Set< ACE_INET_Addr > | Address_Set |
typedef ACE_Array_Base< Subscription > | Subscriptions |
Private Member Functions | |
void | update_consumer (const RtecEventChannelAdmin::ConsumerQOS &sub) throw (CORBA::SystemException) |
void | compute_required_subscriptions (const RtecEventChannelAdmin::ConsumerQOS &sub, Address_Set &multicast_addresses) throw (CORBA::SystemException) |
int | delete_unwanted_subscriptions (Address_Set &multicast_addresses) |
void | add_new_subscriptions (Address_Set &multicast_addresses) |
Private Attributes | |
ACE_TCHAR * | net_if_ |
The NIC used to subscribe for multicast traffic. | |
Subscriptions | subscriptions_ |
List of multicast addresses we subscribe to and dgrams we use. | |
TAO_ECG_Dgram_Handler * | receiver_ |
We callback to this object when a message arrives. | |
CORBA::ULong | recvbuf_size_ |
SOCKbuf size. | |
TAO_EC_Servant_Var< Observer > | observer_ |
TAO_EC_Auto_Command< Observer_Disconnect_Command > | auto_observer_disconnect_ |
Friends | |
class | Observer |
Make update_consumer () accessible to Observer. |
This object acts as an Observer to Event Channel. It subscribes to multicast groups that carry events matching the EC's subscriptions. This object then receives callbacks from the Reactor when data is available on the mcast sockets and alerts TAO_ECG_Dgram_Handler, which reads the data, transforms it into event and pushes to the Event Channel.
Definition at line 52 of file ECG_Mcast_EH.h.
|
Definition at line 155 of file ECG_Mcast_EH.h. Referenced by add_new_subscriptions(), compute_required_subscriptions(), delete_unwanted_subscriptions(), and update_consumer(). |
|
Definition at line 239 of file ECG_Mcast_EH.h. |
|
Constructor. Messages received by this EH will be forwarded to the recv. net_if can be used to specify NIC where multicast messages are expected. would be used to alter the default buffer size. See comments for receiver_ data member on why raw pointer is used for the recv argument. Definition at line 20 of file ECG_Mcast_EH.cpp. References ACE_ASSERT.
00023 : net_if_ (net_if ? ACE_OS::strdup (net_if) : 0) 00024 , subscriptions_ () 00025 , receiver_ (recv) 00026 , recvbuf_size_ (sz) 00027 , observer_ () 00028 , auto_observer_disconnect_ () 00029 { 00030 ACE_ASSERT (this->receiver_); 00031 } |
|
Destructor.
Definition at line 33 of file ECG_Mcast_EH.cpp. References ACE_OS::free().
00034 { 00035 ACE_OS::free (this->net_if_); 00036 } |
|
Definition at line 217 of file ECG_Mcast_EH.cpp. References ACE_ERROR, ACE_NEW, ACE_NONBLOCK, Address_Set, ACE_Unbounded_Set< T >::begin(), TAO_ECG_Mcast_EH::Subscription::dgram, ACE_Unbounded_Set< T >::end(), ENOTSUP, LM_ERROR, TAO_ECG_Mcast_EH::Subscription::mcast_addr, ACE_Event_Handler::reactor(), recvbuf_size_, ACE_Reactor::register_handler(), ACE_Array_Base< T >::size(), and subscriptions_.
00218 { 00219 typedef ACE_Unbounded_Set_Iterator<ACE_INET_Addr> Address_Iterator; 00220 for (Address_Iterator k = multicast_addresses.begin (); 00221 k != multicast_addresses.end (); 00222 ++k) 00223 { 00224 Subscription new_subscription; 00225 new_subscription.mcast_addr = *k; 00226 ACE_NEW (new_subscription.dgram, ACE_SOCK_Dgram_Mcast); 00227 00228 size_t subscriptions_size = this->subscriptions_.size (); 00229 this->subscriptions_.size (subscriptions_size + 1); 00230 this->subscriptions_[subscriptions_size] = new_subscription; 00231 00232 ACE_SOCK_Dgram_Mcast *socket = new_subscription.dgram; 00233 socket->subscribe (new_subscription.mcast_addr, 1, this->net_if_); 00234 if ( socket->enable (ACE_NONBLOCK) != 0 ) { 00235 ACE_ERROR ((LM_ERROR, 00236 "Error: %d - Unable to enable nonblocking on mcast_eh\n", 00237 errno )); 00238 } 00239 00240 if (this->recvbuf_size_ != 0 00241 && (((ACE_SOCK_Dgram *)socket)->set_option(SOL_SOCKET, 00242 SO_RCVBUF, 00243 (void *) &this->recvbuf_size_, 00244 sizeof (this->recvbuf_size_)) == -1) 00245 && errno != ENOTSUP ) 00246 { 00247 ACE_ERROR ((LM_ERROR, 00248 "Error: %d - Unable to set mcast_eh recvbuf_size:%d\n", 00249 errno, 00250 this->recvbuf_size_)); 00251 } 00252 (void) this->reactor ()->register_handler ( 00253 socket->get_handle (), 00254 this, 00255 ACE_Event_Handler::READ_MASK); 00256 } 00257 } |
|
Definition at line 156 of file ECG_Mcast_EH.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ES_EVENT_UNDEFINED, Address_Set, RtecUDPAdmin::UDP_Addr::ipaddr, RtecUDPAdmin::UDP_Addr::port, and RtecEventComm::EventHeader::type.
00161 { 00162 CORBA::ULong count = sub.dependencies.length (); 00163 for (CORBA::ULong i = 0; i != count; ++i) 00164 { 00165 const RtecEventComm::EventHeader& header = 00166 sub.dependencies[i].event.header; 00167 if (0 < header.type && header.type < ACE_ES_EVENT_UNDEFINED) 00168 { 00169 continue; 00170 } 00171 RtecUDPAdmin::UDP_Addr addr; 00172 00173 this->receiver_->get_addr (header, addr ACE_ENV_ARG_PARAMETER); 00174 ACE_CHECK; 00175 00176 ACE_INET_Addr inet_addr (addr.port, addr.ipaddr); 00177 // Ignore errors, if the element is in the set we simply ignore 00178 // the problem... 00179 (void) multicast_addresses.insert (inet_addr); 00180 } 00181 } |
|
Definition at line 184 of file ECG_Mcast_EH.cpp. References Address_Set, ACE_Unbounded_Set< T >::find(), ACE_Event_Handler::reactor(), ACE_Unbounded_Set< T >::remove(), ACE_Reactor::remove_handler(), ACE_Array_Base< T >::size(), and subscriptions_.
00186 { 00187 for (size_t i = 0; i < this->subscriptions_.size (); ++i) 00188 { 00189 ACE_INET_Addr multicast_group = this->subscriptions_[i].mcast_addr; 00190 if (multicast_addresses.find (multicast_group)) 00191 { 00192 // Remove from the list of subscriptions to be added, 00193 // because we already subscribe to it... 00194 (void) multicast_addresses.remove (multicast_group); 00195 continue; 00196 } 00197 00198 // This subscription is no longer needed - remove from reactor, 00199 // close and delete the socket. 00200 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram; 00201 (void) this->reactor ()->remove_handler (socket->get_handle (), 00202 ACE_Event_Handler::READ_MASK); 00203 (void) socket->close(); 00204 delete socket; 00205 // Move the deleted subscription out of the <subscriptions_> 00206 // array by moving the last subscription in array into its place. 00207 this->subscriptions_[i] = 00208 this->subscriptions_[this->subscriptions_.size () - 1]; 00209 this->subscriptions_.size (this->subscriptions_.size () - 1); 00210 --i; 00211 } 00212 00213 return 0; 00214 } |
|
Reactor callback. Notify receiver_ that a dgram corresponding to fd is ready for reading. Reimplemented from ACE_Event_Handler. Definition at line 123 of file ECG_Mcast_EH.cpp. References TAO_ECG_Dgram_Handler::handle_input(), ACE_Array_Base< T >::size(), and subscriptions_.
00124 { 00125 size_t subscriptions_size = this->subscriptions_.size (); 00126 for (size_t i = 0; i != subscriptions_size; ++i) 00127 { 00128 ACE_SOCK_Dgram_Mcast *socket = this->subscriptions_[i].dgram; 00129 if (socket->get_handle () == fd) 00130 { 00131 return this->receiver_->handle_input (*socket); 00132 } 00133 } 00134 return -1; 00135 } |
|
Register for changes in the EC subscription list. When the subscription list becomes non-empty we join the proper multicast groups (using Dgram_Handler to translate between event types and mcast groups) and the class registers itself with the reactor. To insure proper resource clean up, if open () is successful, the user MUST call shutdown () when handler is no longer needed (and its reactor still exists). Definition at line 39 of file ECG_Mcast_EH.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_ERROR, ACE_NEW, ACE_THROW, activate(), auto_observer_disconnect_, TAO_EC_Servant_Var< Observer >::in(), CORBA::is_nil(), LM_ERROR, RtecEventChannelAdmin::Observer_Handle, and TAO_EC_Auto_Command< Observer_Disconnect_Command >::set_command(). Referenced by TAO_ECG_Mcast_Gateway::init_handler().
00041 { 00042 if (!this->receiver_) 00043 { 00044 // We are shut down. 00045 ACE_THROW (CORBA::INTERNAL()); 00046 } 00047 00048 if (CORBA::is_nil (ec)) 00049 { 00050 ACE_ERROR ((LM_ERROR, "TAO_ECG_Mcast_EH::open(): " 00051 "nil ec argument")); 00052 ACE_THROW (CORBA::INTERNAL ()); 00053 } 00054 00055 // Create and activate Event Channel Observer. 00056 ACE_NEW (this->observer_, 00057 Observer (this)); 00058 00059 if (!this->observer_.in ()) 00060 { 00061 ACE_THROW (CORBA::NO_MEMORY ()); 00062 } 00063 00064 TAO_EC_Object_Deactivator observer_deactivator; 00065 RtecEventChannelAdmin::Observer_var observer_ref; 00066 PortableServer::POA_var poa = 00067 this->observer_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); 00068 ACE_CHECK; 00069 00070 activate (observer_ref, 00071 poa.in (), 00072 this->observer_.in (), 00073 observer_deactivator 00074 ACE_ENV_ARG_PARAMETER); 00075 ACE_CHECK; 00076 00077 RtecEventChannelAdmin::Observer_Handle handle = 00078 ec->append_observer (observer_ref.in () 00079 ACE_ENV_ARG_PARAMETER); 00080 ACE_CHECK; 00081 00082 this->observer_->set_deactivator (observer_deactivator); 00083 this->auto_observer_disconnect_.set_command 00084 (Observer_Disconnect_Command (handle, ec)); 00085 } |
|
TAO_ECG_Handler_Shutdown method. Remove ourselves from the event channel, unsubscribe from the multicast groups, close the sockets and deregister from the reactor. Implements TAO_ECG_Handler_Shutdown. Definition at line 88 of file ECG_Mcast_EH.cpp. References auto_observer_disconnect_, TAO_EC_Auto_Command< Observer_Disconnect_Command >::execute(), TAO_EC_Servant_Var< Observer >::in(), ACE_Event_Handler::reactor(), ACE_Reactor::remove_handler(), ACE_Array_Base< T >::size(), and subscriptions_.
00089 { 00090 // Already shut down. 00091 if (!this->receiver_) 00092 return -1; 00093 00094 // Disconnect Observer from EC. 00095 this->auto_observer_disconnect_.execute (); 00096 00097 // Shutdown the observer. 00098 if (this->observer_.in ()) 00099 { 00100 this->observer_->shutdown (); 00101 this->observer_ = 0; 00102 } 00103 00104 // Indicates that we are in a shutdown state. 00105 this->receiver_ = 0; 00106 00107 // Deregister from reactor, close and clean up sockets. 00108 size_t subscriptions_size = this->subscriptions_.size (); 00109 for (size_t i = 0; i != subscriptions_size; ++i) 00110 { 00111 (void) this->reactor ()->remove_handler ( 00112 this->subscriptions_[i].dgram->get_handle (), 00113 ACE_Event_Handler::READ_MASK); 00114 (void) this->subscriptions_[i].dgram->close(); 00115 delete this->subscriptions_[i].dgram; 00116 } 00117 this->subscriptions_.size (0); 00118 00119 return 0; 00120 } |
|
The Observer method. Subscribe/unsubscribe to multicast groups according to changes in consumer subscriptions. Definition at line 138 of file ECG_Mcast_EH.cpp. References ACE_CHECK, ACE_ENV_ARG_PARAMETER, and Address_Set.
00142 { 00143 Address_Set multicast_addresses; 00144 00145 this->compute_required_subscriptions (sub, 00146 multicast_addresses 00147 ACE_ENV_ARG_PARAMETER); 00148 ACE_CHECK; 00149 00150 this->delete_unwanted_subscriptions (multicast_addresses); 00151 00152 this->add_new_subscriptions (multicast_addresses); 00153 } |
|
Make update_consumer () accessible to Observer.
Definition at line 146 of file ECG_Mcast_EH.h. |
|
Manages connection of our observer to the Event Channel. ORDER DEPENDENCY: this member should be declared AFTER . Definition at line 283 of file ECG_Mcast_EH.h. Referenced by open(), and shutdown(). |
|
The NIC used to subscribe for multicast traffic.
Definition at line 233 of file ECG_Mcast_EH.h. |
|
Event Channel Observer. Detects changes in EC consumer subscriptions. ORDER DEPENDENCY: this member should be declared before . Definition at line 279 of file ECG_Mcast_EH.h. |
|
We callback to this object when a message arrives.
Definition at line 271 of file ECG_Mcast_EH.h. |
|
SOCKbuf size.
Definition at line 274 of file ECG_Mcast_EH.h. Referenced by add_new_subscriptions(). |
|
List of multicast addresses we subscribe to and dgrams we use.
Definition at line 259 of file ECG_Mcast_EH.h. Referenced by add_new_subscriptions(), delete_unwanted_subscriptions(), handle_input(), and shutdown(). |