CEC_ProxyPushConsumer.cpp

Go to the documentation of this file.
00001 // $Id: CEC_ProxyPushConsumer.cpp 77001 2007-02-12 07:54:49Z johnnyw $
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00007 
00008 #include "ace/Reverse_Lock_T.h"
00009 
00010 #if ! defined (__ACE_INLINE__)
00011 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.inl"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID (CosEvent,
00015            CEC_ProxyPushConsumer,
00016            "$Id: CEC_ProxyPushConsumer.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00017 
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00021 
00022 TAO_CEC_ProxyPushConsumer::
00023 TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* ec,
00024                            const ACE_Time_Value &timeout)
00025   : event_channel_ (ec),
00026     timeout_(timeout),
00027     refcount_ (1),
00028     connected_ (false)
00029 {
00030   this->lock_ =
00031     this->event_channel_->create_consumer_lock ();
00032 
00033   this->default_POA_ =
00034     this->event_channel_->consumer_poa ();
00035 
00036   this->event_channel_->get_servant_retry_map ().bind (this, 0);
00037 }
00038 
00039 TAO_CEC_ProxyPushConsumer::~TAO_CEC_ProxyPushConsumer (void)
00040 {
00041   this->event_channel_->get_servant_retry_map ().unbind (this);
00042   this->event_channel_->destroy_consumer_lock (this->lock_);
00043 }
00044 
00045 void
00046 TAO_CEC_ProxyPushConsumer::activate (
00047     CosEventChannelAdmin::ProxyPushConsumer_ptr &activated_proxy)
00048 {
00049   CosEventChannelAdmin::ProxyPushConsumer_var result;
00050   try
00051     {
00052       result = this->_this ();
00053     }
00054   catch (const CORBA::Exception&)
00055     {
00056       result  =  CosEventChannelAdmin::ProxyPushConsumer::_nil ();
00057     }
00058   activated_proxy =  result._retn ();
00059 }
00060 
00061 void
00062 TAO_CEC_ProxyPushConsumer::deactivate (void)
00063 {
00064   try
00065     {
00066       PortableServer::POA_var poa =
00067         this->_default_POA ();
00068       PortableServer::ObjectId_var id =
00069         poa->servant_to_id (this);
00070       poa->deactivate_object (id.in ());
00071     }
00072   catch (const CORBA::Exception&)
00073     {
00074       // Exceptions here should not be propagated.  They usually
00075       // indicate that an object is beign disconnected twice, or some
00076       // race condition, but not a fault that the user needs to know
00077       // about.
00078     }
00079 }
00080 
00081 CORBA::Boolean
00082 TAO_CEC_ProxyPushConsumer::supplier_non_existent (
00083       CORBA::Boolean_out disconnected)
00084 {
00085   CORBA::Object_var supplier;
00086   {
00087     ACE_GUARD_THROW_EX (
00088         ACE_Lock, ace_mon, *this->lock_,
00089         CORBA::INTERNAL ());
00090 
00091     disconnected = false;
00092     if (!this->is_connected_i ())
00093       {
00094         disconnected = true;
00095         return false;
00096       }
00097     if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
00098       {
00099         return false;
00100       }
00101     supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
00102   }
00103 
00104 #if (TAO_HAS_MINIMUM_CORBA == 0)
00105   return supplier->_non_existent ();
00106 #else
00107   return false;
00108 #endif /* TAO_HAS_MINIMUM_CORBA */
00109 }
00110 
00111 void
00112 TAO_CEC_ProxyPushConsumer::shutdown (void)
00113 {
00114   CosEventComm::PushSupplier_var supplier;
00115 
00116   {
00117     ACE_GUARD_THROW_EX (
00118         ACE_Lock, ace_mon, *this->lock_,
00119         CORBA::INTERNAL ());
00120     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00121 
00122     supplier = this->supplier_._retn ();
00123     this->connected_ = false;
00124   }
00125 
00126   this->deactivate ();
00127 
00128   if (CORBA::is_nil (supplier.in ()))
00129     return;
00130 
00131   try
00132     {
00133       supplier->disconnect_push_supplier ();
00134     }
00135   catch (const CORBA::Exception&)
00136     {
00137       // Ignore exceptions, we must isolate other clients from
00138       // failures on this one.
00139     }
00140 }
00141 
00142 void
00143 TAO_CEC_ProxyPushConsumer::cleanup_i (void)
00144 {
00145   this->supplier_ =
00146     CosEventComm::PushSupplier::_nil ();
00147   this->connected_ = false;
00148 }
00149 
00150 CORBA::ULong
00151 TAO_CEC_ProxyPushConsumer::_incr_refcnt (void)
00152 {
00153   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00154   return this->refcount_++;
00155 }
00156 
00157 CORBA::ULong
00158 TAO_CEC_ProxyPushConsumer::_decr_refcnt (void)
00159 {
00160   {
00161     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00162     --this->refcount_;
00163     if (this->refcount_ != 0)
00164       return this->refcount_;
00165   }
00166 
00167   // Notify the event channel
00168   this->event_channel_->destroy_proxy (this);
00169   return 0;
00170 }
00171 
00172 void
00173 TAO_CEC_ProxyPushConsumer::connect_push_supplier (
00174       CosEventComm::PushSupplier_ptr push_supplier)
00175 {
00176   {
00177     ACE_GUARD_THROW_EX (
00178         ACE_Lock, ace_mon, *this->lock_,
00179         CORBA::INTERNAL ());
00180     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00181 
00182     if (this->is_connected_i ())
00183       {
00184         if (this->event_channel_->supplier_reconnect () == 0)
00185           throw CosEventChannelAdmin::AlreadyConnected ();
00186 
00187         // Re-connections are allowed, go ahead and disconnect the
00188         // consumer...
00189         this->cleanup_i ();
00190 
00191         // @@ Please read the comments in CEC_ProxyPushSupplier about
00192         //     possible race conditions in this area...
00193         TAO_CEC_Unlock reverse_lock (*this->lock_);
00194 
00195         {
00196           ACE_GUARD_THROW_EX (
00197               TAO_CEC_Unlock, ace_mon, reverse_lock,
00198               CORBA::INTERNAL ());
00199           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00200 
00201           this->event_channel_->disconnected (this);
00202         }
00203 
00204         // What if a second thread connected us after this?
00205         if (this->is_connected_i ())
00206           return;
00207       }
00208     this->supplier_ = apply_policy (push_supplier);
00209     this->connected_ = true;
00210   }
00211 
00212   // Notify the event channel...
00213   this->event_channel_->connected (this);
00214 }
00215 
00216 CosEventComm::PushSupplier_ptr
00217 TAO_CEC_ProxyPushConsumer::apply_policy (CosEventComm::PushSupplier_ptr pre)
00218 {
00219   if (CORBA::is_nil (pre)) return pre;
00220   this->nopolicy_supplier_ = CosEventComm::PushSupplier::_duplicate (pre);
00221 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00222   CosEventComm::PushSupplier_var post =
00223     CosEventComm::PushSupplier::_duplicate (pre);
00224   if (this->timeout_ > ACE_Time_Value::zero)
00225     {
00226       CORBA::PolicyList policy_list;
00227       policy_list.length (1);
00228       policy_list[0] = this->event_channel_->
00229         create_roundtrip_timeout_policy (this->timeout_);
00230 
00231       CORBA::Object_var post_obj = pre->_set_policy_overrides
00232         (policy_list, CORBA::ADD_OVERRIDE);
00233       post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
00234 
00235       policy_list[0]->destroy ();
00236       policy_list.length (0);
00237     }
00238   return post._retn ();
00239 #else
00240   return CosEventComm::PushSupplier::_duplicate (pre);
00241 #endif /* TAO_HAS_CORBA_MESSAGING */
00242 }
00243 
00244 void
00245 TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event)
00246 {
00247   TAO_CEC_ProxyPushConsumer_Guard ace_mon (this->lock_,
00248                                            this->refcount_,
00249                                            this->event_channel_,
00250                                            this);
00251   if (!ace_mon.locked ())
00252     return;
00253 
00254   this->event_channel_->consumer_admin ()->push (event);
00255 }
00256 
00257 void
00258 TAO_CEC_ProxyPushConsumer::disconnect_push_consumer ()
00259 {
00260   CosEventComm::PushSupplier_var supplier;
00261 
00262   {
00263     ACE_GUARD_THROW_EX (
00264         ACE_Lock, ace_mon, *this->lock_,
00265         CORBA::INTERNAL ());
00266     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00267 
00268     if (!this->is_connected_i ())
00269       throw CORBA::BAD_INV_ORDER (); // @@ add user exception?
00270 
00271     supplier = this->supplier_._retn ();
00272 
00273     this->cleanup_i ();
00274   }
00275 
00276   // Notify the event channel...
00277   this->event_channel_->disconnected (this);
00278 
00279   if (CORBA::is_nil (supplier.in ()))
00280     return;
00281 
00282   if (this->event_channel_->disconnect_callbacks ())
00283     {
00284       try
00285         {
00286           supplier->disconnect_push_supplier ();
00287         }
00288       catch (const CORBA::Exception&)
00289         {
00290           // Ignore exceptions, we must isolate other clients from
00291           // failures on this one.
00292         }
00293     }
00294 }
00295 
00296 PortableServer::POA_ptr
00297 TAO_CEC_ProxyPushConsumer::_default_POA (void)
00298 {
00299   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00300 }
00301 
00302 void
00303 TAO_CEC_ProxyPushConsumer::_add_ref (void)
00304 {
00305   this->_incr_refcnt ();
00306 }
00307 
00308 void
00309 TAO_CEC_ProxyPushConsumer::_remove_ref (void)
00310 {
00311   this->_decr_refcnt ();
00312 }
00313 
00314 // ****************************************************************
00315 
00316 TAO_CEC_ProxyPushConsumer_Guard::
00317     TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock,
00318                                      CORBA::ULong &refcount,
00319                                      TAO_CEC_EventChannel *ec,
00320                                      TAO_CEC_ProxyPushConsumer *proxy)
00321  :   lock_ (lock),
00322      refcount_ (refcount),
00323      event_channel_ (ec),
00324      proxy_ (proxy),
00325      locked_ (false)
00326 {
00327   ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00328   // If the guard fails there is not much we can do, raising an
00329   // exception is wrong, the client has *no* way to handle that kind
00330   // of error.  Even worse, there is no exception to raise in that
00331   // case.
00332   // @@ Returning something won't work either, the error should be
00333   // logged though!
00334 
00335   if (!proxy->is_connected_i ())
00336     return;
00337 
00338   this->locked_ = true;
00339   ++this->refcount_;
00340 }
00341 
00342 TAO_CEC_ProxyPushConsumer_Guard::
00343     ~TAO_CEC_ProxyPushConsumer_Guard (void)
00344 {
00345   // This access is safe because guard objects are created on the
00346   // stack, only one thread has access to them
00347   if (!this->locked_)
00348     return;
00349 
00350   {
00351     ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00352     // If the guard fails there is not much we can do, raising an
00353     // exception is wrong, the client has *no* way to handle that kind
00354     // of error.  Even worse, there is no exception to raise in that
00355     // case.
00356     // @@ Returning something won't work either, the error should be
00357     // logged though!
00358 
00359     --this->refcount_;
00360     if (this->refcount_ != 0)
00361       return;
00362   }
00363   this->event_channel_->destroy_proxy (this->proxy_);
00364 }
00365 
00366 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 15:37:17 2008 for TAO_CosEvent by doxygen 1.3.6