CEC_ProxyPushConsumer.cpp

Go to the documentation of this file.
00001 // CEC_ProxyPushConsumer.cpp,v 1.20 2006/04/19 12:36:57 jwillemsen Exp
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00007 
00008 #include "ace/Reverse_Lock_T.h"
00009 
00010 #if ! defined (__ACE_INLINE__)
00011 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.i"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (CosEvent,
00015            CEC_ProxyPushConsumer,
00016            "CEC_ProxyPushConsumer.cpp,v 1.20 2006/04/19 12:36:57 jwillemsen Exp")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00021 
00022 TAO_CEC_ProxyPushConsumer::
00023     TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* ec)
00024   : event_channel_ (ec),
00025     refcount_ (1),
00026     connected_ (false)
00027 {
00028   this->lock_ =
00029     this->event_channel_->create_consumer_lock ();
00030 
00031   this->default_POA_ =
00032     this->event_channel_->consumer_poa ();
00033 
00034   this->event_channel_->get_servant_retry_map ().bind (this, 0);
00035 }
00036 
00037 TAO_CEC_ProxyPushConsumer::~TAO_CEC_ProxyPushConsumer (void)
00038 {
00039   this->event_channel_->get_servant_retry_map ().unbind (this);
00040   this->event_channel_->destroy_consumer_lock (this->lock_);
00041 }
00042 
00043 void
00044 TAO_CEC_ProxyPushConsumer::activate (
00045     CosEventChannelAdmin::ProxyPushConsumer_ptr &activated_proxy
00046     ACE_ENV_ARG_DECL)
00047   ACE_THROW_SPEC ((CORBA::SystemException))
00048 {
00049   CosEventChannelAdmin::ProxyPushConsumer_var result;
00050   ACE_TRY
00051     {
00052       result = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00053       ACE_TRY_CHECK;
00054     }
00055   ACE_CATCHANY
00056     {
00057       result  =  CosEventChannelAdmin::ProxyPushConsumer::_nil ();
00058     }
00059   ACE_ENDTRY;
00060   activated_proxy =  result._retn ();
00061 }
00062 
00063 void
00064 TAO_CEC_ProxyPushConsumer::deactivate (ACE_ENV_SINGLE_ARG_DECL)
00065   ACE_THROW_SPEC ((CORBA::SystemException))
00066 {
00067   ACE_TRY
00068     {
00069       PortableServer::POA_var poa =
00070         this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00071       ACE_TRY_CHECK;
00072       PortableServer::ObjectId_var id =
00073         poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
00074       ACE_TRY_CHECK;
00075       poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
00076       ACE_TRY_CHECK;
00077     }
00078   ACE_CATCHANY
00079     {
00080       // Exceptions here should not be propagated.  They usually
00081       // indicate that an object is beign disconnected twice, or some
00082       // race condition, but not a fault that the user needs to know
00083       // about.
00084     }
00085   ACE_ENDTRY;
00086 }
00087 
00088 CORBA::Boolean
00089 TAO_CEC_ProxyPushConsumer::supplier_non_existent (
00090       CORBA::Boolean_out disconnected
00091       ACE_ENV_ARG_DECL)
00092 {
00093   CORBA::Object_var supplier;
00094   {
00095     ACE_GUARD_THROW_EX (
00096         ACE_Lock, ace_mon, *this->lock_,
00097         CORBA::INTERNAL ());
00098     ACE_CHECK_RETURN (0);
00099 
00100     disconnected = false;
00101     if (!this->is_connected_i ())
00102       {
00103         disconnected = true;
00104         return false;
00105       }
00106     if (CORBA::is_nil (this->supplier_.in ()))
00107       {
00108         return false;
00109       }
00110     supplier = CORBA::Object::_duplicate (this->supplier_.in ());
00111   }
00112 
00113 #if (TAO_HAS_MINIMUM_CORBA == 0)
00114   return supplier->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00115 #else
00116   return false;
00117 #endif /* TAO_HAS_MINIMUM_CORBA */
00118 }
00119 
00120 void
00121 TAO_CEC_ProxyPushConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00122 {
00123   CosEventComm::PushSupplier_var supplier;
00124 
00125   {
00126     ACE_GUARD_THROW_EX (
00127         ACE_Lock, ace_mon, *this->lock_,
00128         CORBA::INTERNAL ());
00129     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00130     ACE_CHECK;
00131 
00132     supplier = this->supplier_._retn ();
00133     this->connected_ = false;
00134   }
00135 
00136   this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
00137   ACE_CHECK;
00138 
00139   if (CORBA::is_nil (supplier.in ()))
00140     return;
00141 
00142   ACE_TRY
00143     {
00144       supplier->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00145       ACE_TRY_CHECK;
00146     }
00147   ACE_CATCHANY
00148     {
00149       // Ignore exceptions, we must isolate other clients from
00150       // failures on this one.
00151     }
00152   ACE_ENDTRY;
00153 }
00154 
00155 void
00156 TAO_CEC_ProxyPushConsumer::cleanup_i (void)
00157 {
00158   this->supplier_ =
00159     CosEventComm::PushSupplier::_nil ();
00160   this->connected_ = false;
00161 }
00162 
00163 CORBA::ULong
00164 TAO_CEC_ProxyPushConsumer::_incr_refcnt (void)
00165 {
00166   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00167   return this->refcount_++;
00168 }
00169 
00170 CORBA::ULong
00171 TAO_CEC_ProxyPushConsumer::_decr_refcnt (void)
00172 {
00173   {
00174     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00175     --this->refcount_;
00176     if (this->refcount_ != 0)
00177       return this->refcount_;
00178   }
00179 
00180   // Notify the event channel
00181   this->event_channel_->destroy_proxy (this);
00182   return 0;
00183 }
00184 
00185 void
00186 TAO_CEC_ProxyPushConsumer::connect_push_supplier (
00187       CosEventComm::PushSupplier_ptr push_supplier
00188       ACE_ENV_ARG_DECL)
00189     ACE_THROW_SPEC ((CORBA::SystemException,
00190                      CosEventChannelAdmin::AlreadyConnected))
00191 {
00192   {
00193     ACE_GUARD_THROW_EX (
00194         ACE_Lock, ace_mon, *this->lock_,
00195         CORBA::INTERNAL ());
00196     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00197     ACE_CHECK;
00198 
00199     if (this->is_connected_i ())
00200       {
00201         if (this->event_channel_->supplier_reconnect () == 0)
00202           ACE_THROW (CosEventChannelAdmin::AlreadyConnected ());
00203 
00204         // Re-connections are allowed, go ahead and disconnect the
00205         // consumer...
00206         this->cleanup_i ();
00207 
00208         // @@ Please read the comments in CEC_ProxyPushSupplier about
00209         //     possible race conditions in this area...
00210         TAO_CEC_Unlock reverse_lock (*this->lock_);
00211 
00212         {
00213           ACE_GUARD_THROW_EX (
00214               TAO_CEC_Unlock, ace_mon, reverse_lock,
00215               CORBA::INTERNAL ());
00216           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00217           ACE_CHECK;
00218 
00219           this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00220           ACE_CHECK;
00221         }
00222 
00223         // What if a second thread connected us after this?
00224         if (this->is_connected_i ())
00225           return;
00226       }
00227     this->supplier_ =
00228       CosEventComm::PushSupplier::_duplicate (push_supplier);
00229     this->connected_ = true;
00230   }
00231 
00232   // Notify the event channel...
00233   this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
00234 }
00235 
00236 void
00237 TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event
00238                                  ACE_ENV_ARG_DECL)
00239     ACE_THROW_SPEC ((CORBA::SystemException))
00240 {
00241   TAO_CEC_ProxyPushConsumer_Guard ace_mon (this->lock_,
00242                                            this->refcount_,
00243                                            this->event_channel_,
00244                                            this);
00245   if (!ace_mon.locked ())
00246     return;
00247 
00248   this->event_channel_->consumer_admin ()->push (event
00249                                                  ACE_ENV_ARG_PARAMETER);
00250   ACE_CHECK;
00251 }
00252 
00253 void
00254 TAO_CEC_ProxyPushConsumer::disconnect_push_consumer (
00255       ACE_ENV_SINGLE_ARG_DECL)
00256     ACE_THROW_SPEC ((CORBA::SystemException))
00257 {
00258   CosEventComm::PushSupplier_var supplier;
00259 
00260   {
00261     ACE_GUARD_THROW_EX (
00262         ACE_Lock, ace_mon, *this->lock_,
00263         CORBA::INTERNAL ());
00264     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00265     ACE_CHECK;
00266 
00267     if (!this->is_connected_i ())
00268       ACE_THROW (CORBA::BAD_INV_ORDER ()); // @@ add user exception?
00269 
00270     supplier = this->supplier_._retn ();
00271 
00272     this->cleanup_i ();
00273   }
00274 
00275   // Notify the event channel...
00276   this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00277   ACE_CHECK;
00278 
00279   if (CORBA::is_nil (supplier.in ()))
00280     return;
00281 
00282   if (this->event_channel_->disconnect_callbacks ())
00283     {
00284       ACE_TRY
00285         {
00286           supplier->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00287           ACE_TRY_CHECK;
00288         }
00289       ACE_CATCHANY
00290         {
00291           // Ignore exceptions, we must isolate other clients from
00292           // failures on this one.
00293         }
00294       ACE_ENDTRY;
00295     }
00296 }
00297 
00298 PortableServer::POA_ptr
00299 TAO_CEC_ProxyPushConsumer::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00300 {
00301   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00302 }
00303 
00304 void
00305 TAO_CEC_ProxyPushConsumer::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00306 {
00307   this->_incr_refcnt ();
00308 }
00309 
00310 void
00311 TAO_CEC_ProxyPushConsumer::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00312 {
00313   this->_decr_refcnt ();
00314 }
00315 
00316 // ****************************************************************
00317 
00318 TAO_CEC_ProxyPushConsumer_Guard::
00319     TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock,
00320                                      CORBA::ULong &refcount,
00321                                      TAO_CEC_EventChannel *ec,
00322                                      TAO_CEC_ProxyPushConsumer *proxy)
00323  :   lock_ (lock),
00324      refcount_ (refcount),
00325      event_channel_ (ec),
00326      proxy_ (proxy),
00327      locked_ (false)
00328 {
00329   ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00330   // If the guard fails there is not much we can do, raising an
00331   // exception is wrong, the client has *no* way to handle that kind
00332   // of error.  Even worse, there is no exception to raise in that
00333   // case.
00334   // @@ Returning something won't work either, the error should be
00335   // logged though!
00336 
00337   if (!proxy->is_connected_i ())
00338     return;
00339 
00340   this->locked_ = true;
00341   ++this->refcount_;
00342 }
00343 
00344 TAO_CEC_ProxyPushConsumer_Guard::
00345     ~TAO_CEC_ProxyPushConsumer_Guard (void)
00346 {
00347   // This access is safe because guard objects are created on the
00348   // stack, only one thread has access to them
00349   if (!this->locked_)
00350     return;
00351 
00352   {
00353     ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00354     // If the guard fails there is not much we can do, raising an
00355     // exception is wrong, the client has *no* way to handle that kind
00356     // of error.  Even worse, there is no exception to raise in that
00357     // case.
00358     // @@ Returning something won't work either, the error should be
00359     // logged though!
00360 
00361     --this->refcount_;
00362     if (this->refcount_ != 0)
00363       return;
00364   }
00365   this->event_channel_->destroy_proxy (this->proxy_);
00366 }
00367 
00368 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:18:17 2006 for TAO_CosEvent by doxygen 1.3.6