CEC_ProxyPullConsumer.cpp

Go to the documentation of this file.
00001 // CEC_ProxyPullConsumer.cpp,v 1.19 2006/03/14 06:14:24 jtc Exp
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00007 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00008 
00009 #include "ace/Reverse_Lock_T.h"
00010 
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.i"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 ACE_RCSID (CosEvent,
00016            CEC_ProxyPullConsumer,
00017            "CEC_ProxyPullConsumer.cpp,v 1.19 2006/03/14 06:14:24 jtc Exp")
00018 
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00022 
00023 TAO_CEC_ProxyPullConsumer::
00024     TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec)
00025   : event_channel_ (ec),
00026     refcount_ (1)
00027 {
00028   this->lock_ =
00029     this->event_channel_->create_consumer_lock ();
00030 
00031   this->default_POA_ =
00032     this->event_channel_->consumer_poa ();
00033 
00034   this->event_channel_->get_servant_retry_map ().bind (this, 0);
00035 }
00036 
00037 TAO_CEC_ProxyPullConsumer::~TAO_CEC_ProxyPullConsumer (void)
00038 {
00039   this->event_channel_->get_servant_retry_map ().unbind (this);
00040   this->event_channel_->destroy_consumer_lock (this->lock_);
00041 }
00042 
00043 void
00044 TAO_CEC_ProxyPullConsumer::activate (
00045     CosEventChannelAdmin::ProxyPullConsumer_ptr &activated_proxy
00046     ACE_ENV_ARG_DECL)
00047   ACE_THROW_SPEC ((CORBA::SystemException))
00048 {
00049   CosEventChannelAdmin::ProxyPullConsumer_var result;
00050   ACE_TRY
00051     {
00052       result = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00053       ACE_TRY_CHECK;
00054     }
00055   ACE_CATCHANY
00056     {
00057       result = CosEventChannelAdmin::ProxyPullConsumer::_nil ();
00058     }
00059   ACE_ENDTRY;
00060   activated_proxy = result._retn ();
00061 }
00062 
00063 void
00064 TAO_CEC_ProxyPullConsumer::deactivate (ACE_ENV_SINGLE_ARG_DECL)
00065   ACE_THROW_SPEC ((CORBA::SystemException))
00066 {
00067   ACE_TRY
00068     {
00069       PortableServer::POA_var poa =
00070         this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00071       ACE_TRY_CHECK;
00072       PortableServer::ObjectId_var id =
00073         poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
00074       ACE_TRY_CHECK;
00075       poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
00076       ACE_TRY_CHECK;
00077     }
00078   ACE_CATCHANY
00079     {
00080       // Exceptions here should not be propagated.  They usually
00081       // indicate that an object is beign disconnected twice, or some
00082       // race condition, but not a fault that the user needs to know
00083       // about.
00084     }
00085   ACE_ENDTRY;
00086 }
00087 
00088 // NOTE: There is some amount of duplicated code here, but it is
00089 // intentional. Mainly we want to avoid locking overhead when
00090 // possible, thus the code flow is optimized for that case more than
00091 // for small code.
00092 
00093 CORBA::Any*
00094 TAO_CEC_ProxyPullConsumer::try_pull_from_supplier (
00095     CORBA::Boolean_out has_event
00096     ACE_ENV_ARG_DECL)
00097 {
00098   has_event = 0;
00099   CosEventComm::PullSupplier_var supplier;
00100   {
00101     ACE_GUARD_THROW_EX (
00102             ACE_Lock, ace_mon, *this->lock_,
00103             CORBA::INTERNAL ());
00104     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00105     ACE_CHECK_RETURN (0);
00106 
00107     if (this->is_connected_i () == 0)
00108       return 0; // ACE_THROW (CosEventComm::Disconnected ());????
00109 
00110     supplier =
00111       CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00112 
00113     // The refcount cannot be zero, because we have at least two
00114     // references,
00115   }
00116 
00117   CORBA::Any_var any;
00118   TAO_CEC_SupplierControl *control =
00119                  this->event_channel_->supplier_control ();
00120 
00121   ACE_TRY
00122     {
00123       any = supplier->try_pull (has_event ACE_ENV_ARG_PARAMETER);
00124       ACE_TRY_CHECK;
00125 
00126       // Inform the control that we got something from the supplier
00127       control->successful_transmission(this);
00128     }
00129   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00130     {
00131       control->supplier_not_exist (this ACE_ENV_ARG_PARAMETER);
00132       ACE_TRY_CHECK;
00133     }
00134   ACE_CATCH (CORBA::SystemException, sysex)
00135     {
00136       control->system_exception (this,
00137                                  sysex
00138                                  ACE_ENV_ARG_PARAMETER);
00139       ACE_TRY_CHECK;
00140     }
00141   ACE_CATCHANY
00142     {
00143       // @@ Should not happen
00144     }
00145   ACE_ENDTRY;
00146   return any._retn ();
00147 }
00148 
00149 CORBA::Any*
00150 TAO_CEC_ProxyPullConsumer::pull_from_supplier (
00151     ACE_ENV_SINGLE_ARG_DECL)
00152 {
00153   CosEventComm::PullSupplier_var supplier;
00154   {
00155     ACE_GUARD_THROW_EX (
00156             ACE_Lock, ace_mon, *this->lock_,
00157             CORBA::INTERNAL ());
00158     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00159     ACE_CHECK_RETURN (0);
00160 
00161     if (this->is_connected_i () == 0)
00162       return 0; // ACE_THROW (CosEventComm::Disconnected ());????
00163 
00164     supplier =
00165       CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00166 
00167     // The refcount cannot be zero, because we have at least two
00168     // references,
00169   }
00170 
00171   CORBA::Any_var any;
00172   ACE_TRY
00173     {
00174       any = supplier->pull (ACE_ENV_SINGLE_ARG_PARAMETER);
00175       ACE_TRY_CHECK;
00176     }
00177   ACE_CATCHANY
00178     {
00179       // @@ This is where the policies for misbehaving suppliers
00180       //    should kick in.... for the moment just ignore them.
00181     }
00182   ACE_ENDTRY;
00183   return any._retn ();
00184 }
00185 
00186 CORBA::Boolean
00187 TAO_CEC_ProxyPullConsumer::supplier_non_existent (
00188       CORBA::Boolean_out disconnected
00189       ACE_ENV_ARG_DECL)
00190 {
00191   CORBA::Object_var supplier;
00192   {
00193     ACE_GUARD_THROW_EX (
00194         ACE_Lock, ace_mon, *this->lock_,
00195         CORBA::INTERNAL ());
00196     ACE_CHECK_RETURN (0);
00197 
00198     disconnected = 0;
00199     if (this->is_connected_i () == 0)
00200       {
00201         disconnected = 1;
00202         return 0;
00203       }
00204     if (CORBA::is_nil (this->supplier_.in ()))
00205       {
00206         return 0;
00207       }
00208     supplier = CORBA::Object::_duplicate (this->supplier_.in ());
00209   }
00210 
00211 #if (TAO_HAS_MINIMUM_CORBA == 0)
00212   return supplier->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00213 #else
00214   return 0;
00215 #endif /* TAO_HAS_MINIMUM_CORBA */
00216 }
00217 
00218 void
00219 TAO_CEC_ProxyPullConsumer::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00220 {
00221   CosEventComm::PullSupplier_var supplier;
00222 
00223   {
00224     ACE_GUARD_THROW_EX (
00225         ACE_Lock, ace_mon, *this->lock_,
00226         CORBA::INTERNAL ());
00227     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00228     ACE_CHECK;
00229 
00230     supplier = this->supplier_._retn ();
00231   }
00232 
00233   this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
00234   ACE_CHECK;
00235 
00236   if (CORBA::is_nil (supplier.in ()))
00237     return;
00238 
00239   ACE_TRY
00240     {
00241       supplier->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00242       ACE_TRY_CHECK;
00243     }
00244   ACE_CATCHANY
00245     {
00246       // Ignore exceptions, we must isolate other clients from
00247       // failures on this one.
00248     }
00249   ACE_ENDTRY;
00250 }
00251 
00252 void
00253 TAO_CEC_ProxyPullConsumer::cleanup_i (void)
00254 {
00255   this->supplier_ =
00256     CosEventComm::PullSupplier::_nil ();
00257 }
00258 
00259 CORBA::ULong
00260 TAO_CEC_ProxyPullConsumer::_incr_refcnt (void)
00261 {
00262   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00263   return this->refcount_++;
00264 }
00265 
00266 CORBA::ULong
00267 TAO_CEC_ProxyPullConsumer::_decr_refcnt (void)
00268 {
00269   {
00270     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00271     this->refcount_--;
00272     if (this->refcount_ != 0)
00273       return this->refcount_;
00274   }
00275 
00276   // Notify the event channel
00277   this->event_channel_->destroy_proxy (this);
00278   return 0;
00279 }
00280 
00281 void
00282 TAO_CEC_ProxyPullConsumer::connect_pull_supplier (
00283       CosEventComm::PullSupplier_ptr pull_supplier
00284       ACE_ENV_ARG_DECL)
00285     ACE_THROW_SPEC ((CORBA::SystemException,
00286                      CosEventChannelAdmin::AlreadyConnected))
00287 {
00288   // Nil PullSuppliers are illegal
00289   if (CORBA::is_nil (pull_supplier))
00290     ACE_THROW (CORBA::BAD_PARAM ());
00291 
00292   {
00293     ACE_GUARD_THROW_EX (
00294         ACE_Lock, ace_mon, *this->lock_,
00295         CORBA::INTERNAL ());
00296     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00297     ACE_CHECK;
00298 
00299     if (this->is_connected_i ())
00300       {
00301         if (this->event_channel_->supplier_reconnect () == 0)
00302           ACE_THROW (CosEventChannelAdmin::AlreadyConnected ());
00303 
00304         // Re-connections are allowed, go ahead and disconnect the
00305         // consumer...
00306         this->cleanup_i ();
00307 
00308         // @@ Please read the comments in CEC_ProxyPullSupplier about
00309         //     possible race conditions in this area...
00310         TAO_CEC_Unlock reverse_lock (*this->lock_);
00311 
00312         {
00313           ACE_GUARD_THROW_EX (
00314               TAO_CEC_Unlock, ace_mon, reverse_lock,
00315               CORBA::INTERNAL ());
00316           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00317           ACE_CHECK;
00318 
00319           this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00320           ACE_CHECK;
00321         }
00322 
00323         // What if a second thread connected us after this?
00324         if (this->is_connected_i ())
00325           return;
00326       }
00327     this->supplier_ =
00328       CosEventComm::PullSupplier::_duplicate (pull_supplier);
00329   }
00330 
00331   // Notify the event channel...
00332   this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
00333 }
00334 
00335 void
00336 TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer (
00337       ACE_ENV_SINGLE_ARG_DECL)
00338     ACE_THROW_SPEC ((CORBA::SystemException))
00339 {
00340   CosEventComm::PullSupplier_var supplier;
00341 
00342   {
00343     ACE_GUARD_THROW_EX (
00344         ACE_Lock, ace_mon, *this->lock_,
00345         CORBA::INTERNAL ());
00346     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00347     ACE_CHECK;
00348 
00349     if (this->is_connected_i () == 0)
00350       ACE_THROW (CORBA::BAD_INV_ORDER ()); // @@ add user exception?
00351 
00352     supplier = this->supplier_._retn ();
00353 
00354     this->cleanup_i ();
00355   }
00356 
00357   // Notify the event channel...
00358   this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00359   ACE_CHECK;
00360 
00361   if (this->event_channel_->disconnect_callbacks ())
00362     {
00363       ACE_TRY
00364         {
00365           supplier->disconnect_pull_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
00366           ACE_TRY_CHECK;
00367         }
00368       ACE_CATCHANY
00369         {
00370           // Ignore exceptions, we must isolate other clients from
00371           // failures on this one.
00372         }
00373       ACE_ENDTRY;
00374     }
00375 }
00376 
00377 PortableServer::POA_ptr
00378 TAO_CEC_ProxyPullConsumer::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00379 {
00380   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00381 }
00382 
00383 void
00384 TAO_CEC_ProxyPullConsumer::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00385 {
00386   this->_incr_refcnt ();
00387 }
00388 
00389 void
00390 TAO_CEC_ProxyPullConsumer::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00391 {
00392   this->_decr_refcnt ();
00393 }
00394 
00395 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:18:17 2006 for TAO_CosEvent by doxygen 1.3.6