00001
00002
00003 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00004 #include "orbsvcs/Event_Utilities.h"
00005 #include "ace/SString.h"
00006
00007 #if !defined(__ACE_INLINE__)
00008 #include "orbsvcs/Event/ECG_UDP_Receiver.inl"
00009 #endif
00010
00011 ACE_RCSID (Event,
00012 ECG_UDP_Receiver,
00013 "$Id: ECG_UDP_Receiver.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00014
00015
00016
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver (void)
00021 {
00022 this->consumer_proxy_ =
00023 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00024
00025 if (this->handler_rptr_.get ())
00026 this->handler_rptr_->shutdown ();
00027 }
00028
00029 void
00030 TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00031 TAO_ECG_Refcounted_Endpoint ignore_from,
00032 RtecUDPAdmin::AddrServer_ptr addr_server)
00033 {
00034
00035
00036
00037 if (CORBA::is_nil (lcl_ec))
00038 {
00039 ACE_ERROR ((LM_ERROR,
00040 "TAO_ECG_UDP_Receiver::init(): "
00041 "<lcl_ec> argument is nil.\n"));
00042 throw CORBA::INTERNAL ();
00043 }
00044
00045 this->cdr_receiver_.init (ignore_from);
00046
00047 this->lcl_ec_ =
00048 RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00049
00050 this->addr_server_ =
00051 RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00052 }
00053
00054 void
00055 TAO_ECG_UDP_Receiver::connect (const RtecEventChannelAdmin::SupplierQOS& pub)
00056 {
00057 if (CORBA::is_nil (this->lcl_ec_.in ()))
00058 {
00059
00060 ACE_ERROR ((LM_ERROR,
00061 "Error initializing TAO_ECG_UDP_Receiver: "
00062 "init() hasn't been called before connect().\n"));
00063
00064
00065 throw CORBA::INTERNAL ();
00066 }
00067
00068 if (pub.publications.length () == 0)
00069 {
00070 ACE_ERROR ((LM_ERROR,
00071 "TAO_ECG_UDP_Receiver::connect(): "
00072 "0-length publications argument.\n"));
00073 throw CORBA::INTERNAL ();
00074 }
00075
00076 if (CORBA::is_nil (this->consumer_proxy_.in ()))
00077 {
00078 this->new_connect (pub);
00079 }
00080 else
00081 {
00082 this->reconnect (pub);
00083 }
00084 }
00085
00086 void
00087 TAO_ECG_UDP_Receiver::new_connect (const RtecEventChannelAdmin::SupplierQOS& pub)
00088 {
00089
00090 RtecEventComm::PushSupplier_var supplier_ref;
00091 PortableServer::POA_var poa = this->_default_POA ();
00092
00093 TAO_EC_Object_Deactivator deactivator;
00094 activate (supplier_ref,
00095 poa.in (),
00096 this,
00097 deactivator);
00098
00099
00100 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00101 this->lcl_ec_->for_suppliers ();
00102
00103 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
00104 supplier_admin->obtain_push_consumer ();
00105 ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00106
00107 proxy->connect_push_supplier (supplier_ref.in (),
00108 pub);
00109
00110
00111 this->consumer_proxy_ = proxy._retn ();
00112 this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00113 this->set_deactivator (deactivator);
00114 }
00115
00116 void
00117 TAO_ECG_UDP_Receiver::reconnect (const RtecEventChannelAdmin::SupplierQOS& pub)
00118 {
00119
00120 RtecEventComm::PushSupplier_var supplier_ref;
00121 PortableServer::POA_var poa = this->_default_POA ();
00122
00123 CORBA::Object_var obj = poa->servant_to_reference (this);
00124 supplier_ref =
00125 RtecEventComm::PushSupplier::_narrow (obj.in ());
00126
00127 if (CORBA::is_nil (supplier_ref.in ()))
00128 {
00129 throw CORBA::INTERNAL ();
00130 }
00131
00132
00133 this->consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00134 pub);
00135 }
00136
00137 void
00138 TAO_ECG_UDP_Receiver::disconnect_push_supplier (void)
00139 {
00140
00141 this->auto_proxy_disconnect_.disallow_command ();
00142
00143 this->shutdown ();
00144 }
00145
00146 void
00147 TAO_ECG_UDP_Receiver::shutdown (void)
00148 {
00149 if (this->handler_rptr_.get ())
00150 this->handler_rptr_->shutdown ();
00151 TAO_ECG_Refcounted_Handler empty_handler_rptr;
00152 this->handler_rptr_ = empty_handler_rptr;
00153
00154 this->consumer_proxy_ =
00155 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00156
00157 this->auto_proxy_disconnect_.execute ();
00158
00159 this->deactivator_.deactivate ();
00160
00161 this->cdr_receiver_.shutdown ();
00162 }
00163
00164
00165 class TAO_ECG_Event_CDR_Decoder: public TAO_ECG_CDR_Processor
00166 {
00167 public:
00168 virtual int decode (TAO_InputCDR &cdr);
00169
00170 RtecEventComm::EventSet events;
00171 };
00172
00173 int
00174 TAO_ECG_Event_CDR_Decoder::decode (TAO_InputCDR &cdr)
00175 {
00176 if (!(cdr >> this->events))
00177 {
00178 ACE_ERROR_RETURN ((LM_ERROR,
00179 "Error decoding events cdr.\n"),
00180 -1);
00181 }
00182 return 0;
00183 }
00184
00185 int
00186 TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
00187 {
00188 try
00189 {
00190
00191
00192 if (CORBA::is_nil (this->consumer_proxy_.in ()))
00193 {
00194 ACE_ERROR ((LM_ERROR,
00195 "TAO_ECG_UDP_Receiver::handle_input() "
00196 "called but the Receiver is not connected "
00197 "to an event channel. Shutting down the Receiver.\n"));
00198 this->shutdown ();
00199
00200 return 0;
00201 }
00202
00203
00204 TAO_ECG_Event_CDR_Decoder cdr_decoder;
00205 int const result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder);
00206
00207 if (result == 0)
00208
00209 {
00210 return 0;
00211 }
00212 if (result == -1)
00213 {
00214 ACE_ERROR_RETURN ((LM_ERROR,
00215 "Error receiving multicasted events.\n"),
00216 0);
00217 }
00218
00219 this->consumer_proxy_->push (cdr_decoder.events);
00220 }
00221
00222 catch (const CORBA::Exception& ex)
00223 {
00224 ACE_ERROR ((LM_ERROR,
00225 "Caught and swallowed EXCEPTION in "
00226 "ECG_UDP_Receiver::handle_input: %s\n",
00227 ex._info ().c_str ()));
00228 }
00229 return 0;
00230 }
00231
00232 TAO_END_VERSIONED_NAMESPACE_DECL