ECG_UDP_Receiver.cpp

Go to the documentation of this file.
00001 // $Id: ECG_UDP_Receiver.cpp 78820 2007-07-07 20:06:46Z sowayaa $
00002 
00003 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00004 #include "orbsvcs/Event_Utilities.h"
00005 #include "ace/SString.h"
00006 
00007 #if !defined(__ACE_INLINE__)
00008 #include "orbsvcs/Event/ECG_UDP_Receiver.inl"
00009 #endif /* __ACE_INLINE__ */
00010 
00011 ACE_RCSID (Event,
00012            ECG_UDP_Receiver,
00013            "$Id: ECG_UDP_Receiver.cpp 78820 2007-07-07 20:06:46Z sowayaa $")
00014 
00015 
00016 // ****************************************************************
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver (void)
00021 {
00022   this->consumer_proxy_ =
00023     RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00024 
00025   if (this->handler_rptr_.get ())
00026     this->handler_rptr_->shutdown ();
00027 }
00028 
00029 void
00030 TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00031                             TAO_ECG_Refcounted_Endpoint ignore_from,
00032                             RtecUDPAdmin::AddrServer_ptr addr_server)
00033 {
00034   // Verify arguments.
00035   // <addr_server> is allowed to be nil.  But then, if get_addr () method
00036   // is invoked, it throws an exception.
00037   if (CORBA::is_nil (lcl_ec))
00038     {
00039       ACE_ERROR ((LM_ERROR,
00040                   "TAO_ECG_UDP_Receiver::init(): "
00041                   "<lcl_ec> argument is nil.\n"));
00042       throw CORBA::INTERNAL ();
00043     }
00044 
00045   this->cdr_receiver_.init (ignore_from);
00046 
00047   this->lcl_ec_ =
00048     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00049 
00050   this->addr_server_ =
00051     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00052 }
00053 
00054 void
00055 TAO_ECG_UDP_Receiver::connect (const RtecEventChannelAdmin::SupplierQOS& pub)
00056 {
00057   if (CORBA::is_nil (this->lcl_ec_.in ()))
00058     {
00059       //FUZZ: disable check_for_lack_ACE_OS
00060       ACE_ERROR ((LM_ERROR,
00061                   "Error initializing TAO_ECG_UDP_Receiver: "
00062                   "init() hasn't been called before connect().\n"));
00063       //FUZZ: enable check_for_lack_ACE_OS
00064 
00065       throw CORBA::INTERNAL ();
00066     }
00067 
00068   if (pub.publications.length () == 0)
00069     {
00070       ACE_ERROR ((LM_ERROR,
00071                   "TAO_ECG_UDP_Receiver::connect(): "
00072                   "0-length publications argument.\n"));
00073       throw CORBA::INTERNAL ();
00074     }
00075 
00076   if (CORBA::is_nil (this->consumer_proxy_.in ()))
00077     {
00078       this->new_connect (pub);
00079     }
00080   else
00081     {
00082       this->reconnect (pub);
00083     }
00084 }
00085 
00086 void
00087 TAO_ECG_UDP_Receiver::new_connect (const RtecEventChannelAdmin::SupplierQOS& pub)
00088 {
00089   // Activate with poa.
00090   RtecEventComm::PushSupplier_var supplier_ref;
00091   PortableServer::POA_var poa = this->_default_POA ();
00092 
00093   TAO_EC_Object_Deactivator deactivator;
00094   activate (supplier_ref,
00095             poa.in (),
00096             this,
00097             deactivator);
00098 
00099   // Connect as a supplier to the local EC.
00100   RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00101     this->lcl_ec_->for_suppliers ();
00102 
00103   RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
00104     supplier_admin->obtain_push_consumer ();
00105   ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00106 
00107   proxy->connect_push_supplier (supplier_ref.in (),
00108                                 pub);
00109 
00110   // Update resource managers.
00111   this->consumer_proxy_ = proxy._retn ();
00112   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00113   this->set_deactivator (deactivator);
00114 }
00115 
00116 void
00117 TAO_ECG_UDP_Receiver::reconnect (const RtecEventChannelAdmin::SupplierQOS& pub)
00118 {
00119   // Obtain our object reference from the POA.
00120   RtecEventComm::PushSupplier_var supplier_ref;
00121   PortableServer::POA_var poa = this->_default_POA ();
00122 
00123   CORBA::Object_var obj = poa->servant_to_reference (this);
00124   supplier_ref =
00125     RtecEventComm::PushSupplier::_narrow (obj.in ());
00126 
00127   if (CORBA::is_nil (supplier_ref.in ()))
00128     {
00129       throw CORBA::INTERNAL ();
00130     }
00131 
00132   // Reconnect.
00133   this->consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00134                                                 pub);
00135 }
00136 
00137 void
00138 TAO_ECG_UDP_Receiver::disconnect_push_supplier (void)
00139 {
00140   // Prevent attempts to disconnect.
00141   this->auto_proxy_disconnect_.disallow_command ();
00142 
00143   this->shutdown ();
00144 }
00145 
00146 void
00147 TAO_ECG_UDP_Receiver::shutdown (void)
00148 {
00149   if (this->handler_rptr_.get ())
00150     this->handler_rptr_->shutdown ();
00151   TAO_ECG_Refcounted_Handler empty_handler_rptr;
00152   this->handler_rptr_ = empty_handler_rptr;
00153 
00154   this->consumer_proxy_ =
00155     RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00156 
00157   this->auto_proxy_disconnect_.execute ();
00158 
00159   this->deactivator_.deactivate ();
00160 
00161   this->cdr_receiver_.shutdown ();
00162 }
00163 
00164 // Helper class for using <cdr_receiver_>.
00165 class TAO_ECG_Event_CDR_Decoder: public TAO_ECG_CDR_Processor
00166 {
00167 public:
00168   virtual int decode (TAO_InputCDR &cdr);
00169 
00170   RtecEventComm::EventSet events;
00171 };
00172 
00173 int
00174 TAO_ECG_Event_CDR_Decoder::decode (TAO_InputCDR &cdr)
00175 {
00176   if (!(cdr >> this->events))
00177     {
00178       ACE_ERROR_RETURN ((LM_ERROR,
00179                          "Error decoding events cdr.\n"),
00180                         -1);
00181     }
00182   return 0;
00183 }
00184 
00185 int
00186 TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
00187 {
00188   try
00189     {
00190       // Make sure we are connected to the Event Channel before proceeding
00191       // any further.
00192       if (CORBA::is_nil (this->consumer_proxy_.in ()))
00193         {
00194           ACE_ERROR ((LM_ERROR,
00195                       "TAO_ECG_UDP_Receiver::handle_input() "
00196                       "called but the Receiver is not connected "
00197                       "to an event channel. Shutting down the Receiver.\n"));
00198           this->shutdown ();
00199 
00200           return 0;
00201         }
00202 
00203       // Receive data.
00204       TAO_ECG_Event_CDR_Decoder cdr_decoder;
00205       int const result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder);
00206 
00207       if (result == 0)
00208         // No data to act on.
00209         {
00210           return 0;
00211         }
00212       if (result == -1)
00213         {
00214           ACE_ERROR_RETURN ((LM_ERROR,
00215                             "Error receiving multicasted events.\n"),
00216                             0);
00217         }
00218 
00219       this->consumer_proxy_->push (cdr_decoder.events);
00220     }
00221 
00222   catch (const CORBA::Exception& ex)
00223     {
00224       ACE_ERROR ((LM_ERROR,
00225                   "Caught and swallowed EXCEPTION in "
00226                   "ECG_UDP_Receiver::handle_input: %s\n",
00227                   ex._info ().c_str ()));
00228     }
00229   return 0;
00230 }
00231 
00232 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:06 2010 for TAO_RTEvent by  doxygen 1.4.7