ECG_UDP_Receiver.cpp

Go to the documentation of this file.
00001 // ECG_UDP_Receiver.cpp,v 1.13 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
00004 #include "orbsvcs/Event_Utilities.h"
00005 #include "ace/SString.h"
00006 
00007 #if !defined(__ACE_INLINE__)
00008 #include "orbsvcs/Event/ECG_UDP_Receiver.i"
00009 #endif /* __ACE_INLINE__ */
00010 
00011 ACE_RCSID (Event,
00012            ECG_UDP_Receiver,
00013            "ECG_UDP_Receiver.cpp,v 1.13 2006/03/14 06:14:25 jtc Exp")
00014 
00015 
00016 // ****************************************************************
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 TAO_ECG_UDP_Receiver::~TAO_ECG_UDP_Receiver (void)
00021 {
00022   this->consumer_proxy_ =
00023     RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00024 
00025   if (this->handler_rptr_.get ())
00026     this->handler_rptr_->shutdown ();
00027 }
00028 
00029 void
00030 TAO_ECG_UDP_Receiver::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec,
00031                             TAO_ECG_Refcounted_Endpoint ignore_from,
00032                             RtecUDPAdmin::AddrServer_ptr addr_server
00033                             ACE_ENV_ARG_DECL)
00034 {
00035   // Verify arguments.
00036   // <addr_server> is allowed to be nil.  But then, if get_addr () method
00037   // is invoked, it throws an exception.
00038   if (CORBA::is_nil (lcl_ec))
00039     {
00040       ACE_ERROR ((LM_ERROR,
00041                   "TAO_ECG_UDP_Receiver::init(): "
00042                   "<lcl_ec> argument is nil.\n"));
00043       ACE_THROW (CORBA::INTERNAL ());
00044     }
00045 
00046   this->cdr_receiver_.init (ignore_from);
00047 
00048   this->lcl_ec_ =
00049     RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
00050 
00051   this->addr_server_ =
00052     RtecUDPAdmin::AddrServer::_duplicate (addr_server);
00053 }
00054 
00055 void
00056 TAO_ECG_UDP_Receiver::connect (const RtecEventChannelAdmin::SupplierQOS& pub
00057                                ACE_ENV_ARG_DECL)
00058 {
00059   if (CORBA::is_nil (this->lcl_ec_.in ()))
00060     {
00061       ACE_ERROR ((LM_ERROR,
00062                   "Error initializing TAO_ECG_UDP_Receiver: "
00063                   "init() hasn't been called before connect().\n"));
00064       ACE_THROW (CORBA::INTERNAL ());
00065     }
00066 
00067   if (pub.publications.length () == 0)
00068     {
00069       ACE_ERROR ((LM_ERROR,
00070                   "TAO_ECG_UDP_Receiver::connect(): "
00071                   "0-length publications argument.\n"));
00072       ACE_THROW (CORBA::INTERNAL ());
00073     }
00074 
00075   if (CORBA::is_nil (this->consumer_proxy_.in ()))
00076     {
00077       this->new_connect (pub ACE_ENV_ARG_PARAMETER);
00078       ACE_CHECK;
00079     }
00080   else
00081     {
00082       this->reconnect (pub ACE_ENV_ARG_PARAMETER);
00083       ACE_CHECK;
00084     }
00085 }
00086 
00087 void
00088 TAO_ECG_UDP_Receiver::new_connect (const RtecEventChannelAdmin::SupplierQOS& pub
00089                                    ACE_ENV_ARG_DECL)
00090 {
00091   // Activate with poa.
00092   RtecEventComm::PushSupplier_var supplier_ref;
00093   PortableServer::POA_var poa = this->_default_POA ();
00094 
00095   TAO_EC_Object_Deactivator deactivator;
00096   activate (supplier_ref,
00097             poa.in (),
00098             this,
00099             deactivator
00100             ACE_ENV_ARG_PARAMETER);
00101   ACE_CHECK;
00102 
00103   // Connect as a supplier to the local EC.
00104   RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00105     this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
00106   ACE_CHECK;
00107 
00108   RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
00109     supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00110   ACE_CHECK;
00111   ECG_Receiver_Auto_Proxy_Disconnect new_proxy_disconnect (proxy.in ());
00112 
00113   proxy->connect_push_supplier (supplier_ref.in (),
00114                                 pub
00115                                 ACE_ENV_ARG_PARAMETER);
00116   ACE_CHECK;
00117 
00118   // Update resource managers.
00119   this->consumer_proxy_ = proxy._retn ();
00120   this->auto_proxy_disconnect_.set_command (new_proxy_disconnect);
00121   this->set_deactivator (deactivator);
00122 }
00123 
00124 void
00125 TAO_ECG_UDP_Receiver::reconnect (const RtecEventChannelAdmin::SupplierQOS& pub
00126                                  ACE_ENV_ARG_DECL)
00127 {
00128   // Obtain our object reference from the POA.
00129   RtecEventComm::PushSupplier_var supplier_ref;
00130   PortableServer::POA_var poa = this->_default_POA ();
00131 
00132   CORBA::Object_var obj = poa->servant_to_reference (this ACE_ENV_ARG_PARAMETER);
00133   ACE_CHECK;
00134   supplier_ref =
00135     RtecEventComm::PushSupplier::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
00136   ACE_CHECK;
00137 
00138   if (CORBA::is_nil (supplier_ref.in ()))
00139     {
00140       ACE_THROW (CORBA::INTERNAL ());
00141     }
00142 
00143   // Reconnect.
00144   this->consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00145                                                 pub
00146                                                 ACE_ENV_ARG_PARAMETER);
00147   ACE_CHECK;
00148 }
00149 
00150 void
00151 TAO_ECG_UDP_Receiver::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
00152     ACE_THROW_SPEC ((CORBA::SystemException))
00153 {
00154   // Prevent attempts to disconnect.
00155   this->auto_proxy_disconnect_.disallow_command ();
00156 
00157   this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00158 }
00159 
00160 void
00161 TAO_ECG_UDP_Receiver::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00162 {
00163   if (this->handler_rptr_.get ())
00164     this->handler_rptr_->shutdown ();
00165   TAO_ECG_Refcounted_Handler empty_handler_rptr;
00166   this->handler_rptr_ = empty_handler_rptr;
00167 
00168   this->consumer_proxy_ =
00169     RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00170 
00171   this->auto_proxy_disconnect_.execute ();
00172 
00173   this->deactivator_.deactivate ();
00174 
00175   this->cdr_receiver_.shutdown ();
00176 }
00177 
00178 // Helper class for using <cdr_receiver_>.
00179 class TAO_ECG_Event_CDR_Decoder: public TAO_ECG_CDR_Processor
00180 {
00181 public:
00182   virtual int decode (TAO_InputCDR &cdr);
00183 
00184   RtecEventComm::EventSet events;
00185 };
00186 
00187 int
00188 TAO_ECG_Event_CDR_Decoder::decode (TAO_InputCDR &cdr)
00189 {
00190   if (!(cdr >> this->events))
00191     {
00192       ACE_ERROR_RETURN ((LM_ERROR,
00193                          "Error decoding events cdr.\n"),
00194                         -1);
00195     }
00196   return 0;
00197 }
00198 
00199 int
00200 TAO_ECG_UDP_Receiver::handle_input (ACE_SOCK_Dgram& dgram)
00201 {
00202   ACE_DECLARE_NEW_CORBA_ENV;
00203   ACE_TRY
00204     {
00205       // Make sure we are connected to the Event Channel before proceeding
00206       // any further.
00207       if (CORBA::is_nil (this->consumer_proxy_.in ()))
00208         {
00209           ACE_ERROR ((LM_ERROR,
00210                       "TAO_ECG_UDP_Receiver::handle_input() "
00211                       "called but the Receiver is not connected "
00212                       "to an event channel. Shutting down the Receiver.\n"));
00213           this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00214           ACE_TRY_CHECK;
00215 
00216           return 0;
00217         }
00218 
00219       // Receive data.
00220       TAO_ECG_Event_CDR_Decoder cdr_decoder;
00221       int result = this->cdr_receiver_.handle_input (dgram, &cdr_decoder);
00222 
00223       if (result == 0)
00224         // No data to act on.
00225         {
00226           return 0;
00227         }
00228       if (result == -1)
00229         {
00230           ACE_ERROR_RETURN ((LM_ERROR,
00231                             "Error receiving multicasted events.\n"),
00232                             0);
00233         }
00234 
00235       this->consumer_proxy_->push (cdr_decoder.events ACE_ENV_ARG_PARAMETER);
00236       ACE_TRY_CHECK;
00237     }
00238 
00239   ACE_CATCHANY
00240     {
00241       ACE_DEBUG ((LM_ERROR,
00242                   "Caught and swallowed EXCEPTION in "
00243                   "ECG_UDP_Receiver::handle_input: %s\n",
00244                   ACE_ANY_EXCEPTION._info ().c_str ()));
00245     }
00246   ACE_ENDTRY;
00247   return 0;
00248 }
00249 
00250 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:11 2006 for TAO_RTEvent by doxygen 1.3.6