#include <ECG_UDP_Receiver.h>
Inheritance diagram for TAO_ECG_UDP_Receiver:
~TAO_ECG_UDP_Receiver (void) | |
void | init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, TAO_ECG_Refcounted_Endpoint ignore_from, RtecUDPAdmin::AddrServer_ptr addr_server) |
void | connect (const RtecEventChannelAdmin::SupplierQOS &pub) |
Connect or reconnect to the EC with the given publications. | |
void | set_handler_shutdown (TAO_ECG_Refcounted_Handler handler_shutdown_rptr) |
void | shutdown (void) |
static TAO_EC_Servant_Var< TAO_ECG_UDP_Receiver > | create (CORBA::Boolean perform_crc=0) |
Initialization and termination methods. | |
Public Member Functions | |
void | get_addr (const RtecEventComm::EventHeader &header, RtecUDPAdmin::UDP_Addr_out addr) |
void | get_address (const RtecEventComm::EventHeader &header, RtecUDPAdmin::UDP_Address_out addr) |
virtual void | disconnect_push_supplier (void) |
virtual int | handle_input (ACE_SOCK_Dgram &dgram) |
TAO_ECG_Dgram_Handler method. | |
Protected Member Functions | |
TAO_ECG_UDP_Receiver (CORBA::Boolean perform_crc=false) | |
Private Types | |
typedef TAO_EC_Auto_Command< TAO_ECG_UDP_Receiver_Disconnect_Command > | ECG_Receiver_Auto_Proxy_Disconnect |
Private Member Functions | |
void | new_connect (const RtecEventChannelAdmin::SupplierQOS &pub) |
Helpers for the connect() method. | |
void | reconnect (const RtecEventChannelAdmin::SupplierQOS &pub) |
Helpers for the connect() method. | |
Private Attributes | |
RtecEventChannelAdmin::EventChannel_var | lcl_ec_ |
Event Channel to which we act as a supplier. | |
RtecUDPAdmin::AddrServer_var | addr_server_ |
The server used to map event types to multicast groups. | |
RtecEventChannelAdmin::ProxyPushConsumer_var | consumer_proxy_ |
Proxy used to supply events to the Event Channel. | |
TAO_ECG_CDR_Message_Receiver | cdr_receiver_ |
TAO_ECG_Refcounted_Handler | handler_rptr_ |
ECG_Receiver_Auto_Proxy_Disconnect | auto_proxy_disconnect_ |
Manages our connection to Consumer Proxy. |
This class connects as a supplier to an EventChannel, and supplies to it all events it receives via UDP or Multicast.
Definition at line 103 of file ECG_UDP_Receiver.h.
typedef TAO_EC_Auto_Command<TAO_ECG_UDP_Receiver_Disconnect_Command> TAO_ECG_UDP_Receiver::ECG_Receiver_Auto_Proxy_Disconnect [private] |
Definition at line 226 of file ECG_UDP_Receiver.h.
TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver | ( | void | ) |
Definition at line 20 of file ECG_UDP_Receiver.cpp.
00021 { 00022 this->consumer_proxy_ = 00023 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00024 00025 if (this->handler_rptr_.get ()) 00026 this->handler_rptr_->shutdown (); 00027 }
TAO_ECG_UDP_Receiver::TAO_ECG_UDP_Receiver | ( | CORBA::Boolean | perform_crc = false |
) | [protected] |
Constructor (protected). Clients can create new TAO_ECG_UDP_Receiver objects using the static create() method.
void TAO_ECG_UDP_Receiver::connect | ( | const RtecEventChannelAdmin::SupplierQOS & | pub | ) |
Connect or reconnect to the EC with the given publications.
Definition at line 55 of file ECG_UDP_Receiver.cpp.
References ACE_ERROR, CORBA::is_nil(), LM_ERROR, new_connect(), RtecEventChannelAdmin::SupplierQOS::publications, and reconnect().
00056 { 00057 if (CORBA::is_nil (this->lcl_ec_.in ())) 00058 { 00059 //FUZZ: disable check_for_lack_ACE_OS 00060 ACE_ERROR ((LM_ERROR, 00061 "Error initializing TAO_ECG_UDP_Receiver: " 00062 "init() hasn't been called before connect().\n")); 00063 //FUZZ: enable check_for_lack_ACE_OS 00064 00065 throw CORBA::INTERNAL (); 00066 } 00067 00068 if (pub.publications.length () == 0) 00069 { 00070 ACE_ERROR ((LM_ERROR, 00071 "TAO_ECG_UDP_Receiver::connect(): " 00072 "0-length publications argument.\n")); 00073 throw CORBA::INTERNAL (); 00074 } 00075 00076 if (CORBA::is_nil (this->consumer_proxy_.in ())) 00077 { 00078 this->new_connect (pub); 00079 } 00080 else 00081 { 00082 this->reconnect (pub); 00083 } 00084 }
static TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> TAO_ECG_UDP_Receiver::create | ( | CORBA::Boolean | perform_crc = 0 |
) | [static] |
Initialization and termination methods.
Create a new TAO_ECG_UDP_Receiver object. (Constructor access is restricted to insure that all TAO_ECG_UDP_Receiver objects are heap-allocated.)
Referenced by TAO_ECG_Mcast_Gateway::init_receiver().
void TAO_ECG_UDP_Receiver::disconnect_push_supplier | ( | void | ) | [virtual] |
The PushSupplier idl method. Invokes shutdown (), which may result in the object being deleted, if refcounting is used to manage its lifetime.
Definition at line 138 of file ECG_UDP_Receiver.cpp.
References auto_proxy_disconnect_, TAO_EC_Auto_Command< T >::disallow_command(), and shutdown().
00139 { 00140 // Prevent attempts to disconnect. 00141 this->auto_proxy_disconnect_.disallow_command (); 00142 00143 this->shutdown (); 00144 }
void TAO_ECG_UDP_Receiver::get_addr | ( | const RtecEventComm::EventHeader & | header, | |
RtecUDPAdmin::UDP_Addr_out | addr | |||
) | [virtual] |
Accessor. Call the RtecUDPAdmin::AddrServer::get_addr. Throws exception if nil Address Server was specified in init ().
Implements TAO_ECG_Dgram_Handler.
void TAO_ECG_UDP_Receiver::get_address | ( | const RtecEventComm::EventHeader & | header, | |
RtecUDPAdmin::UDP_Address_out | addr | |||
) | [virtual] |
Call the RtecUDPAdmin::AddrServer::get_address. Throws exception if nil Address Server was specified in init ().
Implements TAO_ECG_Dgram_Handler.
int TAO_ECG_UDP_Receiver::handle_input | ( | ACE_SOCK_Dgram & | dgram | ) | [virtual] |
TAO_ECG_Dgram_Handler method.
UDP/Multicast Event_Handlers call this method when data is available at the socket - the <dgram> is ready for reading. Data is read from the socket, and, if complete message is received, the event is pushed to the local Event Channel.
Implements TAO_ECG_Dgram_Handler.
Definition at line 186 of file ECG_UDP_Receiver.cpp.
References CORBA::Exception::_info(), ACE_ERROR, ACE_ERROR_RETURN, ACE_String_Base< CHAR >::c_str(), cdr_receiver_, consumer_proxy_, TAO_ECG_Event_CDR_Decoder::events, TAO_ECG_CDR_Message_Receiver::handle_input(), CORBA::is_nil(), LM_ERROR, and shutdown().
00187 { 00188 try 00189 { 00190 // Make sure we are connected to the Event Channel before proceeding 00191 // any further. 00192 if (CORBA::is_nil (this->consumer_proxy_.in ())) 00193 { 00194 ACE_ERROR ((LM_ERROR, 00195 "TAO_ECG_UDP_Receiver::handle_input() " 00196 "called but the Receiver is not connected " 00197 "to an event channel. Shutting down the Receiver.\n")); 00198 this->shutdown (); 00199 00200 return 0; 00201 } 00202 00203 // Receive data. 00204 TAO_ECG_Event_CDR_Decoder cdr_decoder; 00205 int const result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder); 00206 00207 if (result == 0) 00208 // No data to act on. 00209 { 00210 return 0; 00211 } 00212 if (result == -1) 00213 { 00214 ACE_ERROR_RETURN ((LM_ERROR, 00215 "Error receiving multicasted events.\n"), 00216 0); 00217 } 00218 00219 this->consumer_proxy_->push (cdr_decoder.events); 00220 } 00221 00222 catch (const CORBA::Exception& ex) 00223 { 00224 ACE_ERROR ((LM_ERROR, 00225 "Caught and swallowed EXCEPTION in " 00226 "ECG_UDP_Receiver::handle_input: %s\n", 00227 ex._info ().c_str ())); 00228 } 00229 return 0; 00230 }
void TAO_ECG_UDP_Receiver::init | ( | RtecEventChannelAdmin::EventChannel_ptr | lcl_ec, | |
TAO_ECG_Refcounted_Endpoint | ignore_from, | |||
RtecUDPAdmin::AddrServer_ptr | addr_server | |||
) |
lcl_ec | Event Channel to which we will act as a supplier of events. | |
ignore_from | Endpoint used to remove events generated by the same process. | |
addr_server | Address server used to obtain mapping of event type to multicast group. To insure proper resource clean up, if init () is successful, shutdown () must be called when the receiver is no longer needed. This is done by disconnect_push_supplier() method. If disconnect_push_supplier() will not be called, it is the responsibility of the user. Furthermore, if shutdown() is not explicitly called by either disconnect_push_supplier () or the user, the receiver will clean up the resources in its destructor, however, certain entities involved in cleanup must still exist at that point, e.g., POA. |
Definition at line 30 of file ECG_UDP_Receiver.cpp.
References ACE_ERROR, addr_server_, cdr_receiver_, TAO_ECG_CDR_Message_Receiver::init(), CORBA::is_nil(), lcl_ec_, and LM_ERROR.
00033 { 00034 // Verify arguments. 00035 // <addr_server> is allowed to be nil. But then, if get_addr () method 00036 // is invoked, it throws an exception. 00037 if (CORBA::is_nil (lcl_ec)) 00038 { 00039 ACE_ERROR ((LM_ERROR, 00040 "TAO_ECG_UDP_Receiver::init(): " 00041 "<lcl_ec> argument is nil.\n")); 00042 throw CORBA::INTERNAL (); 00043 } 00044 00045 this->cdr_receiver_.init (ignore_from); 00046 00047 this->lcl_ec_ = 00048 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); 00049 00050 this->addr_server_ = 00051 RtecUDPAdmin::AddrServer::_duplicate (addr_server); 00052 }
void TAO_ECG_UDP_Receiver::new_connect | ( | const RtecEventChannelAdmin::SupplierQOS & | pub | ) | [private] |
Helpers for the connect() method.
Definition at line 87 of file ECG_UDP_Receiver.cpp.
References activate(), auto_proxy_disconnect_, consumer_proxy_, TAO_Objref_Var_T< T >::in(), lcl_ec_, TAO_EC_Auto_Command< T >::set_command(), and TAO_EC_Deactivated_Object::set_deactivator().
Referenced by connect().
00088 { 00089 // Activate with poa. 00090 RtecEventComm::PushSupplier_var supplier_ref; 00091 PortableServer::POA_var poa = this->_default_POA (); 00092 00093 TAO_EC_Object_Deactivator deactivator; 00094 activate (supplier_ref, 00095 poa.in (), 00096 this, 00097 deactivator); 00098 00099 // Connect as a supplier to the local EC. 00100 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = 00101 this->lcl_ec_->for_suppliers (); 00102 00103 RtecEventChannelAdmin::ProxyPushConsumer_var proxy = 00104 supplier_admin->obtain_push_consumer (); 00105 ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ()); 00106 00107 proxy->connect_push_supplier (supplier_ref.in (), 00108 pub); 00109 00110 // Update resource managers. 00111 this->consumer_proxy_ = proxy._retn (); 00112 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect); 00113 this->set_deactivator (deactivator); 00114 }
void TAO_ECG_UDP_Receiver::reconnect | ( | const RtecEventChannelAdmin::SupplierQOS & | pub | ) | [private] |
Helpers for the connect() method.
Definition at line 117 of file ECG_UDP_Receiver.cpp.
References consumer_proxy_, TAO_Pseudo_Var_T< T >::in(), and CORBA::is_nil().
Referenced by connect().
00118 { 00119 // Obtain our object reference from the POA. 00120 RtecEventComm::PushSupplier_var supplier_ref; 00121 PortableServer::POA_var poa = this->_default_POA (); 00122 00123 CORBA::Object_var obj = poa->servant_to_reference (this); 00124 supplier_ref = 00125 RtecEventComm::PushSupplier::_narrow (obj.in ()); 00126 00127 if (CORBA::is_nil (supplier_ref.in ())) 00128 { 00129 throw CORBA::INTERNAL (); 00130 } 00131 00132 // Reconnect. 00133 this->consumer_proxy_->connect_push_supplier (supplier_ref.in (), 00134 pub); 00135 }
void TAO_ECG_UDP_Receiver::set_handler_shutdown | ( | TAO_ECG_Refcounted_Handler | handler_shutdown_rptr | ) |
Set the handler we must notify when shutdown occurs. (This is the handler that alerts us when data is available on udp/mcast socket.) Shutdown notification gives the handler an opportunity to properly clean up resources.
void TAO_ECG_UDP_Receiver::shutdown | ( | void | ) |
If this class is used with refcounting, calling this method may result in decrementing of the reference count (due to deactivation) and deletion of the object.
Definition at line 147 of file ECG_UDP_Receiver.cpp.
References auto_proxy_disconnect_, cdr_receiver_, consumer_proxy_, TAO_EC_Object_Deactivator::deactivate(), TAO_EC_Deactivated_Object::deactivator_, TAO_EC_Auto_Command< T >::execute(), handler_rptr_, and TAO_ECG_CDR_Message_Receiver::shutdown().
Referenced by disconnect_push_supplier(), and handle_input().
00148 { 00149 if (this->handler_rptr_.get ()) 00150 this->handler_rptr_->shutdown (); 00151 TAO_ECG_Refcounted_Handler empty_handler_rptr; 00152 this->handler_rptr_ = empty_handler_rptr; 00153 00154 this->consumer_proxy_ = 00155 RtecEventChannelAdmin::ProxyPushConsumer::_nil (); 00156 00157 this->auto_proxy_disconnect_.execute (); 00158 00159 this->deactivator_.deactivate (); 00160 00161 this->cdr_receiver_.shutdown (); 00162 }
RtecUDPAdmin::AddrServer_var TAO_ECG_UDP_Receiver::addr_server_ [private] |
The server used to map event types to multicast groups.
Definition at line 211 of file ECG_UDP_Receiver.h.
Referenced by init().
Manages our connection to Consumer Proxy.
Definition at line 228 of file ECG_UDP_Receiver.h.
Referenced by disconnect_push_supplier(), new_connect(), and shutdown().
Helper for reading incoming UDP/Multicast messages. It assembles message fragments and provides access to a cdr stream once the complete message has been received.
Definition at line 219 of file ECG_UDP_Receiver.h.
Referenced by handle_input(), init(), and shutdown().
RtecEventChannelAdmin::ProxyPushConsumer_var TAO_ECG_UDP_Receiver::consumer_proxy_ [private] |
Proxy used to supply events to the Event Channel.
Definition at line 214 of file ECG_UDP_Receiver.h.
Referenced by handle_input(), new_connect(), reconnect(), and shutdown().
Handler we must notify when shutdown occurs, so it has an opportunity to clean up resources.
Definition at line 223 of file ECG_UDP_Receiver.h.
Referenced by shutdown().
RtecEventChannelAdmin::EventChannel_var TAO_ECG_UDP_Receiver::lcl_ec_ [private] |
Event Channel to which we act as a supplier.
Definition at line 208 of file ECG_UDP_Receiver.h.
Referenced by init(), and new_connect().