00001
00002
00003 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00004 #include "orbsvcs/Event_Utilities.h"
00005 #include "ace/SString.h"
00006
00007 #if !defined(__ACE_INLINE__)
00008 #include "orbsvcs/Event/ECG_UDP_Receiver.i"
00009 #endif
00010
00011 ACE_RCSID (Event,
00012 ECG_UDP_Receiver,
00013 "ECG_UDP_Receiver.cpp,v 1.13 2006/03/14 06:14:25 jtc Exp")
00014
00015
00016
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver (void)
00021 {
00022 this->consumer_proxy_ =
00023 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00024
00025 if (this->handler_rptr_.get ())
00026 this->handler_rptr_->shutdown ();
00027 }
00028
00029 void
00030 TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00031 TAO_ECG_Refcounted_Endpoint ignore_from,
00032 RtecUDPAdmin::AddrServer_ptr addr_server
00033 ACE_ENV_ARG_DECL)
00034 {
00035
00036
00037
00038 if (CORBA::is_nil (lcl_ec))
00039 {
00040 ACE_ERROR ((LM_ERROR,
00041 "TAO_ECG_UDP_Receiver::init(): "
00042 "<lcl_ec> argument is nil.\n"));
00043 ACE_THROW (CORBA::INTERNAL ());
00044 }
00045
00046 this->cdr_receiver_.init (ignore_from);
00047
00048 this->lcl_ec_ =
00049 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00050
00051 this->addr_server_ =
00052 RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00053 }
00054
00055 void
00056 TAO_ECG_UDP_Receiver::connect (const RtecEventChannelAdmin::SupplierQOS& pub
00057 ACE_ENV_ARG_DECL)
00058 {
00059 if (CORBA::is_nil (this->lcl_ec_.in ()))
00060 {
00061 ACE_ERROR ((LM_ERROR,
00062 "Error initializing TAO_ECG_UDP_Receiver: "
00063 "init() hasn't been called before connect().\n"));
00064 ACE_THROW (CORBA::INTERNAL ());
00065 }
00066
00067 if (pub.publications.length () == 0)
00068 {
00069 ACE_ERROR ((LM_ERROR,
00070 "TAO_ECG_UDP_Receiver::connect(): "
00071 "0-length publications argument.\n"));
00072 ACE_THROW (CORBA::INTERNAL ());
00073 }
00074
00075 if (CORBA::is_nil (this->consumer_proxy_.in ()))
00076 {
00077 this->new_connect (pub ACE_ENV_ARG_PARAMETER);
00078 ACE_CHECK;
00079 }
00080 else
00081 {
00082 this->reconnect (pub ACE_ENV_ARG_PARAMETER);
00083 ACE_CHECK;
00084 }
00085 }
00086
00087 void
00088 TAO_ECG_UDP_Receiver::new_connect (const RtecEventChannelAdmin::SupplierQOS& pub
00089 ACE_ENV_ARG_DECL)
00090 {
00091
00092 RtecEventComm::PushSupplier_var supplier_ref;
00093 PortableServer::POA_var poa = this->_default_POA ();
00094
00095 TAO_EC_Object_Deactivator deactivator;
00096 activate (supplier_ref,
00097 poa.in (),
00098 this,
00099 deactivator
00100 ACE_ENV_ARG_PARAMETER);
00101 ACE_CHECK;
00102
00103
00104 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00105 this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00106 ACE_CHECK;
00107
00108 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
00109 supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00110 ACE_CHECK;
00111 ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00112
00113 proxy->connect_push_supplier (supplier_ref.in (),
00114 pub
00115 ACE_ENV_ARG_PARAMETER);
00116 ACE_CHECK;
00117
00118
00119 this->consumer_proxy_ = proxy._retn ();
00120 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00121 this->set_deactivator (deactivator);
00122 }
00123
00124 void
00125 TAO_ECG_UDP_Receiver::reconnect (const RtecEventChannelAdmin::SupplierQOS& pub
00126 ACE_ENV_ARG_DECL)
00127 {
00128
00129 RtecEventComm::PushSupplier_var supplier_ref;
00130 PortableServer::POA_var poa = this->_default_POA ();
00131
00132 CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00133 ACE_CHECK;
00134 supplier_ref =
00135 RtecEventComm::PushSupplier::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00136 ACE_CHECK;
00137
00138 if (CORBA::is_nil (supplier_ref.in ()))
00139 {
00140 ACE_THROW (CORBA::INTERNAL ());
00141 }
00142
00143
00144 this->consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00145 pub
00146 ACE_ENV_ARG_PARAMETER);
00147 ACE_CHECK;
00148 }
00149
00150 void
00151 TAO_ECG_UDP_Receiver::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
00152 ACE_THROW_SPEC ((CORBA::SystemException))
00153 {
00154
00155 this->auto_proxy_disconnect_.disallow_command ();
00156
00157 this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00158 }
00159
00160 void
00161 TAO_ECG_UDP_Receiver::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00162 {
00163 if (this->handler_rptr_.get ())
00164 this->handler_rptr_->shutdown ();
00165 TAO_ECG_Refcounted_Handler empty_handler_rptr;
00166 this->handler_rptr_ = empty_handler_rptr;
00167
00168 this->consumer_proxy_ =
00169 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00170
00171 this->auto_proxy_disconnect_.execute ();
00172
00173 this->deactivator_.deactivate ();
00174
00175 this->cdr_receiver_.shutdown ();
00176 }
00177
00178
00179 class TAO_ECG_Event_CDR_Decoder: public TAO_ECG_CDR_Processor
00180 {
00181 public:
00182 virtual int decode (TAO_InputCDR &cdr);
00183
00184 RtecEventComm::EventSet events;
00185 };
00186
00187 int
00188 TAO_ECG_Event_CDR_Decoder::decode (TAO_InputCDR &cdr)
00189 {
00190 if (!(cdr >> this->events))
00191 {
00192 ACE_ERROR_RETURN ((LM_ERROR,
00193 "Error decoding events cdr.\n"),
00194 -1);
00195 }
00196 return 0;
00197 }
00198
00199 int
00200 TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
00201 {
00202 ACE_DECLARE_NEW_CORBA_ENV;
00203 ACE_TRY
00204 {
00205
00206
00207 if (CORBA::is_nil (this->consumer_proxy_.in ()))
00208 {
00209 ACE_ERROR ((LM_ERROR,
00210 "TAO_ECG_UDP_Receiver::handle_input() "
00211 "called but the Receiver is not connected "
00212 "to an event channel. Shutting down the Receiver.\n"));
00213 this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00214 ACE_TRY_CHECK;
00215
00216 return 0;
00217 }
00218
00219
00220 TAO_ECG_Event_CDR_Decoder cdr_decoder;
00221 int result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder);
00222
00223 if (result == 0)
00224
00225 {
00226 return 0;
00227 }
00228 if (result == -1)
00229 {
00230 ACE_ERROR_RETURN ((LM_ERROR,
00231 "Error receiving multicasted events.\n"),
00232 0);
00233 }
00234
00235 this->consumer_proxy_->push (cdr_decoder.events ACE_ENV_ARG_PARAMETER);
00236 ACE_TRY_CHECK;
00237 }
00238
00239 ACE_CATCHANY
00240 {
00241 ACE_DEBUG ((LM_ERROR,
00242 "Caught and swallowed EXCEPTION in "
00243 "ECG_UDP_Receiver::handle_input: %s\n",
00244 ACE_ANY_EXCEPTION._info ().c_str ()));
00245 }
00246 ACE_ENDTRY;
00247 return 0;
00248 }
00249
00250 TAO_END_VERSIONED_NAMESPACE_DECL