EC_Default_ProxyConsumer.cpp

Go to the documentation of this file.
00001 // EC_Default_ProxyConsumer.cpp,v 1.6 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_Default_ProxyConsumer.h"
00004 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00005 #include "orbsvcs/Event/EC_Supplier_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_Supplier_Filter.h"
00007 #include "ace/Synch_T.h"
00008 
00009 ACE_RCSID(Event, EC_Default_ProxyConsumer, "EC_Default_ProxyConsumer.cpp,v 1.6 2006/03/14 06:14:25 jtc Exp")
00010 
00011 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00012 
00013 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00014 
00015 TAO_EC_Default_ProxyPushConsumer::
00016     TAO_EC_Default_ProxyPushConsumer (TAO_EC_Event_Channel_Base* ec)
00017   : TAO_EC_ProxyPushConsumer (ec)
00018 {
00019 }
00020 
00021 TAO_EC_Default_ProxyPushConsumer::~TAO_EC_Default_ProxyPushConsumer (void)
00022 {
00023 }
00024 
00025 void
00026 TAO_EC_Default_ProxyPushConsumer::connect_push_supplier (
00027       RtecEventComm::PushSupplier_ptr push_supplier,
00028       const RtecEventChannelAdmin::SupplierQOS& qos
00029       ACE_ENV_ARG_DECL)
00030   ACE_THROW_SPEC ((CORBA::SystemException,
00031                    RtecEventChannelAdmin::AlreadyConnected))
00032 {
00033   {
00034     ACE_GUARD_THROW_EX (
00035         ACE_Lock, ace_mon, *this->lock_,
00036         CORBA::INTERNAL ());
00037     // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00038     ACE_CHECK;
00039 
00040     if (this->is_connected_i ())
00041       {
00042         if (this->event_channel_->supplier_reconnect () == 0)
00043           ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
00044 
00045         // Re-connections are allowed, go ahead and disconnect the
00046         // consumer...
00047         this->cleanup_i ();
00048 
00049         // @@ Please read the comments in EC_ProxySuppliers about
00050         //     possible race conditions in this area...
00051         TAO_EC_Unlock reverse_lock (*this->lock_);
00052 
00053         {
00054           ACE_GUARD_THROW_EX (
00055               TAO_EC_Unlock, ace_mon, reverse_lock,
00056               CORBA::INTERNAL ());
00057           // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00058           ACE_CHECK;
00059 
00060           this->event_channel_->reconnected (this ACE_ENV_ARG_PARAMETER);
00061           ACE_CHECK;
00062         }
00063 
00064         // A separate thread could have connected siomultaneously,
00065         // this is probably an application error, handle it as
00066         // gracefully as possible
00067         if (this->is_connected_i ())
00068           return; // @@ Should we throw
00069       }
00070 
00071     this->supplier_ =
00072       RtecEventComm::PushSupplier::_duplicate (push_supplier);
00073     this->connected_ = 1;
00074     this->qos_ = qos;
00075 
00076 #if TAO_EC_ENABLE_DEBUG_MESSAGES
00077     ACE_DEBUG ((LM_DEBUG,
00078                  "Building filter for supplier <%x>.\n",
00079                 this));
00080 #endif /* TAO_EC_ENABLED_DEBUG_MESSAGES */
00081     this->filter_ =
00082       this->event_channel_->supplier_filter_builder ()->create (this->qos_);
00083     this->filter_->bind (this);
00084   }
00085 
00086   // Notify the event channel...
00087   this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
00088 }
00089 
00090 void
00091 TAO_EC_Default_ProxyPushConsumer::push (const RtecEventComm::EventSet& event
00092                                         ACE_ENV_ARG_DECL)
00093   ACE_THROW_SPEC ((CORBA::SystemException))
00094 {
00095   TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
00096                                           this->refcount_,
00097                                           this->event_channel_,
00098                                           this);
00099   if (!ace_mon.locked ())
00100     return;
00101 
00102   ace_mon.filter->push (event, this
00103                         ACE_ENV_ARG_PARAMETER);
00104   ACE_CHECK;
00105 }
00106 
00107 void
00108 TAO_EC_Default_ProxyPushConsumer::disconnect_push_consumer (
00109       ACE_ENV_SINGLE_ARG_DECL)
00110     ACE_THROW_SPEC ((CORBA::SystemException))
00111 {
00112   RtecEventComm::PushSupplier_var supplier;
00113   int connected = 0;
00114 
00115   {
00116     ACE_GUARD_THROW_EX (
00117         ACE_Lock, ace_mon, *this->lock_,
00118         CORBA::INTERNAL ());
00119     // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00120     ACE_CHECK;
00121 
00122     connected = this->is_connected_i ();
00123     supplier = this->supplier_._retn ();
00124     this->connected_ = 0;
00125 
00126     if (connected)
00127       this->cleanup_i ();
00128   }
00129 
00130   // Notify the event channel...
00131   this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00132   ACE_CHECK;
00133 
00134   if (CORBA::is_nil (supplier.in ()))
00135     {
00136       return;
00137     }
00138 
00139   if (this->event_channel_->disconnect_callbacks ())
00140     {
00141       ACE_TRY
00142         {
00143           supplier->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00144           ACE_TRY_CHECK;
00145         }
00146       ACE_CATCHANY
00147         {
00148           // Ignore exceptions, we must isolate other clients from
00149           // failures on this one.
00150         }
00151       ACE_ENDTRY;
00152     }
00153 }
00154 
00155 PortableServer::POA_ptr
00156 TAO_EC_Default_ProxyPushConsumer::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00157 {
00158   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00159 }
00160 
00161 void
00162 TAO_EC_Default_ProxyPushConsumer::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00163 {
00164   this->_incr_refcnt ();
00165 }
00166 
00167 void
00168 TAO_EC_Default_ProxyPushConsumer::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00169 {
00170   this->_decr_refcnt ();
00171 }
00172 
00173 void
00174 TAO_EC_Default_ProxyPushConsumer::activate (
00175    RtecEventChannelAdmin::ProxyPushConsumer_ptr &proxy
00176    ACE_ENV_ARG_DECL)
00177   ACE_THROW_SPEC ((CORBA::SystemException))
00178 {
00179   proxy = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00180   ACE_CHECK;
00181 }
00182 
00183 PortableServer::ObjectId
00184 TAO_EC_Default_ProxyPushConsumer::object_id (ACE_ENV_SINGLE_ARG_DECL)
00185   ACE_THROW_SPEC ((CORBA::SystemException))
00186 {
00187   PortableServer::ObjectId_var result =
00188     this->default_POA_->servant_to_id (this ACE_ENV_ARG_PARAMETER);
00189   return result.in ();
00190 }
00191 
00192 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:06 2006 for TAO_RTEvent by doxygen 1.3.6