CEC_TypedProxyPushConsumer.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 //
00003 // $Id: CEC_TypedProxyPushConsumer.cpp 77001 2007-02-12 07:54:49Z johnnyw $
00004 
00005 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00006 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00007 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00008 #include "orbsvcs/CosEvent/CEC_DynamicImplementation.h"
00009 #include "tao/debug.h"
00010 
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 #include "ace/Reverse_Lock_T.h"
00016 
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00020 
00021 // Implementation skeleton constructor
00022 TAO_CEC_TypedProxyPushConsumer::TAO_CEC_TypedProxyPushConsumer
00023 (TAO_CEC_TypedEventChannel* ec, const ACE_Time_Value &timeout)
00024   : typed_event_channel_ (ec),
00025     timeout_ (timeout),
00026     refcount_ (1),
00027     connected_ (0)
00028 {
00029   this->lock_ =
00030     this->typed_event_channel_->create_consumer_lock ();
00031 
00032   this->default_POA_ =
00033     this->typed_event_channel_->typed_consumer_poa ();
00034 
00035   this->typed_event_channel_->get_servant_retry_map ().bind (this, 0);
00036 
00037   // DSI initialization
00038   if (TAO_debug_level >= 10)
00039     {
00040       ACE_DEBUG ((LM_DEBUG,
00041                   ACE_TEXT ("***** Initializing the DSI for the new TypedProxyPushConsumer *****\n")));
00042     }
00043 
00044   this->dsi_impl_ = new
00045     TAO_CEC_DynamicImplementationServer (this->default_POA_.in(),
00046                                          this,
00047                                          this->typed_event_channel_);
00048 
00049   try{
00050     // tempporary fix, should put this into some init function.
00051 
00052     this->oid_ =
00053       this->default_POA_->activate_object (this->dsi_impl_);
00054   }
00055   catch (...){
00056   }
00057 }
00058 
00059 // Implementation skeleton destructor
00060 TAO_CEC_TypedProxyPushConsumer::~TAO_CEC_TypedProxyPushConsumer (void)
00061 {
00062   try{
00063     this->default_POA_->deactivate_object (this->oid_.in ());
00064   }
00065   catch (...){
00066   }
00067 
00068   delete dsi_impl_;
00069 
00070   this->typed_event_channel_->get_servant_retry_map ().unbind (this);
00071   this->typed_event_channel_->destroy_consumer_lock (this->lock_);
00072 }
00073 
00074 void
00075 TAO_CEC_TypedProxyPushConsumer::activate (
00076     CosTypedEventChannelAdmin::TypedProxyPushConsumer_ptr &activated_proxy)
00077 {
00078   CosTypedEventChannelAdmin::TypedProxyPushConsumer_var result;
00079   try
00080     {
00081       result = this->_this ();
00082     }
00083   catch (const CORBA::Exception&)
00084     {
00085       result = CosTypedEventChannelAdmin::TypedProxyPushConsumer::_nil ();
00086     }
00087   activated_proxy = result._retn ();
00088 }
00089 
00090 void
00091 TAO_CEC_TypedProxyPushConsumer::deactivate (void)
00092 {
00093   try
00094     {
00095       PortableServer::POA_var poa =
00096         this->_default_POA ();
00097       PortableServer::ObjectId_var id =
00098         poa->servant_to_id (this);
00099       poa->deactivate_object (id.in ());
00100     }
00101   catch (const CORBA::Exception&)
00102     {
00103       // Exceptions here should not be propagated.  They usually
00104       // indicate that an object is beign disconnected twice, or some
00105       // race condition, but not a fault that the user needs to know
00106       // about.
00107     }
00108 }
00109 
00110 CORBA::Boolean
00111 TAO_CEC_TypedProxyPushConsumer::supplier_non_existent (
00112       CORBA::Boolean_out disconnected)
00113 {
00114   CORBA::Object_var supplier;
00115   {
00116     ACE_GUARD_THROW_EX (
00117         ACE_Lock, ace_mon, *this->lock_,
00118         CORBA::INTERNAL ());
00119 
00120     disconnected = 0;
00121     if (this->is_connected_i () == 0)
00122       {
00123         disconnected = 1;
00124         return 0;
00125       }
00126     if (CORBA::is_nil (this->nopolicy_typed_supplier_.in ()))
00127       {
00128         return 0;
00129       }
00130     supplier = CORBA::Object::_duplicate
00131       (this->nopolicy_typed_supplier_.in ());
00132   }
00133 
00134 #if (TAO_HAS_MINIMUM_CORBA == 0)
00135   return supplier->_non_existent ();
00136 #else
00137   return 0;
00138 #endif /* TAO_HAS_MINIMUM_CORBA */
00139 }
00140 
00141 void
00142 TAO_CEC_TypedProxyPushConsumer::shutdown (void)
00143 {
00144   CosEventComm::PushSupplier_var supplier;
00145 
00146   {
00147     ACE_GUARD_THROW_EX (
00148         ACE_Lock, ace_mon, *this->lock_,
00149         CORBA::INTERNAL ());
00150     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00151 
00152     supplier = this->typed_supplier_._retn ();
00153     this->connected_ = 0;
00154   }
00155 
00156   this->deactivate ();
00157 
00158   if (CORBA::is_nil (supplier.in ()))
00159     return;
00160 
00161   try
00162     {
00163       supplier->disconnect_push_supplier ();
00164     }
00165   catch (const CORBA::Exception&)
00166     {
00167       // Ignore exceptions, we must isolate other clients from
00168       // failures on this one.
00169     }
00170 }
00171 
00172 void
00173 TAO_CEC_TypedProxyPushConsumer::cleanup_i (void)
00174 {
00175   this->typed_supplier_ =
00176     CosEventComm::PushSupplier::_nil ();
00177   this->connected_ = 0;
00178 }
00179 
00180 CORBA::ULong
00181 TAO_CEC_TypedProxyPushConsumer::_incr_refcnt (void)
00182 {
00183   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00184   return this->refcount_++;
00185 }
00186 
00187 CORBA::ULong
00188 TAO_CEC_TypedProxyPushConsumer::_decr_refcnt (void)
00189 {
00190   {
00191     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00192     this->refcount_--;
00193     if (this->refcount_ != 0)
00194       return this->refcount_;
00195   }
00196 
00197   // Notify the event channel
00198   this->typed_event_channel_->destroy_proxy (this);
00199   return 0;
00200 }
00201 
00202 void
00203 TAO_CEC_TypedProxyPushConsumer::connect_push_supplier (
00204       CosEventComm::PushSupplier_ptr push_supplier)
00205 {
00206   {
00207     ACE_GUARD_THROW_EX (
00208         ACE_Lock, ace_mon, *this->lock_,
00209         CORBA::INTERNAL ());
00210     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00211 
00212     if (this->is_connected_i ())
00213       {
00214         if (this->typed_event_channel_->supplier_reconnect () == 0)
00215           throw CosEventChannelAdmin::AlreadyConnected ();
00216 
00217         // Re-connections are allowed, go ahead and disconnect the
00218         // consumer...
00219         this->cleanup_i ();
00220 
00221         // @@ Please read the comments in CEC_ProxyPushSupplier about
00222         //     possible race conditions in this area...
00223         TAO_CEC_Unlock reverse_lock (*this->lock_);
00224 
00225         {
00226           ACE_GUARD_THROW_EX (
00227               TAO_CEC_Unlock, ace_mon, reverse_lock,
00228               CORBA::INTERNAL ());
00229           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00230 
00231           this->typed_event_channel_->disconnected (this);
00232         }
00233 
00234         // What if a second thread connected us after this?
00235         if (this->is_connected_i ())
00236           return;
00237       }
00238     this->typed_supplier_ = apply_policy (push_supplier);
00239     this->connected_ = 1;
00240   }
00241 
00242   // Notify the event channel...
00243   this->typed_event_channel_->connected (this);
00244 }
00245 
00246 CosEventComm::PushSupplier_ptr
00247 TAO_CEC_TypedProxyPushConsumer::apply_policy
00248   (CosEventComm::PushSupplier_ptr pre)
00249 {
00250   if (CORBA::is_nil(pre)) return pre;
00251   this->nopolicy_typed_supplier_ =
00252     CosEventComm::PushSupplier::_duplicate (pre);
00253   CosEventComm::PushSupplier_var post =
00254     CosEventComm::PushSupplier::_duplicate (pre);
00255   if (this->timeout_ > ACE_Time_Value::zero)
00256     {
00257       CORBA::PolicyList policy_list;
00258       policy_list.length (1);
00259       policy_list[0] = this->typed_event_channel_->
00260         create_roundtrip_timeout_policy (this->timeout_);
00261 
00262       CORBA::Object_var post_obj = pre->_set_policy_overrides
00263         (policy_list, CORBA::ADD_OVERRIDE);
00264       post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
00265 
00266       policy_list[0]->destroy ();
00267       policy_list.length (0);
00268     }
00269   return post._retn ();
00270 }
00271 
00272 void
00273 TAO_CEC_TypedProxyPushConsumer::push (const CORBA::Any& /* event */)
00274 {
00275   throw CORBA::NO_IMPLEMENT ();
00276 }
00277 
00278 void
00279 TAO_CEC_TypedProxyPushConsumer::disconnect_push_consumer ()
00280 {
00281   CosEventComm::PushSupplier_var supplier;
00282 
00283   {
00284     ACE_GUARD_THROW_EX (
00285         ACE_Lock, ace_mon, *this->lock_,
00286         CORBA::INTERNAL ());
00287     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00288 
00289     if (this->is_connected_i () == 0)
00290       throw CORBA::BAD_INV_ORDER (); // @@ add user exception?
00291 
00292     supplier = this->typed_supplier_._retn ();
00293 
00294     this->cleanup_i ();
00295   }
00296 
00297   // Notify the event channel...
00298   this->typed_event_channel_->disconnected (this);
00299 
00300   if (!CORBA::is_nil (supplier.in ()))
00301     {
00302       if (this->typed_event_channel_->disconnect_callbacks ())
00303         {
00304           try
00305             {
00306               supplier->disconnect_push_supplier ();
00307             }
00308           catch (const CORBA::Exception&)
00309             {
00310               // Ignore exceptions, we must isolate other clients from
00311               // failures on this one.
00312             }
00313         }
00314     }
00315 }
00316 
00317 PortableServer::POA_ptr
00318 TAO_CEC_TypedProxyPushConsumer::_default_POA (void)
00319 {
00320   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00321 }
00322 
00323 void
00324 TAO_CEC_TypedProxyPushConsumer::_add_ref (void)
00325 {
00326   this->_incr_refcnt ();
00327 }
00328 
00329 void
00330 TAO_CEC_TypedProxyPushConsumer::_remove_ref (void)
00331 {
00332   this->_decr_refcnt ();
00333 }
00334 
00335 CORBA::Object_ptr
00336 TAO_CEC_TypedProxyPushConsumer::get_typed_consumer (void)
00337 
00338 {
00339   CORBA::Object_var server =
00340          default_POA_->id_to_reference (this->oid_.in ());
00341 
00342   return CORBA::Object::_duplicate (server.in());
00343 }
00344 
00345 void
00346 TAO_CEC_TypedProxyPushConsumer::invoke (const TAO_CEC_TypedEvent& typed_event)
00347 {
00348   TAO_CEC_TypedProxyPushConsumer_Guard ace_mon (this->lock_,
00349                                                 this->refcount_,
00350                                                 this->typed_event_channel_,
00351                                                 this);
00352   if (!ace_mon.locked ())
00353     return;
00354 
00355   this->typed_event_channel_->typed_consumer_admin ()->invoke (typed_event);
00356 }
00357 
00358 // ****************************************************************
00359 
00360 TAO_CEC_TypedProxyPushConsumer_Guard::
00361     TAO_CEC_TypedProxyPushConsumer_Guard (ACE_Lock *lock,
00362                                           CORBA::ULong &refcount,
00363                                           TAO_CEC_TypedEventChannel *ec,
00364                                           TAO_CEC_TypedProxyPushConsumer *proxy)
00365  :   lock_ (lock),
00366      refcount_ (refcount),
00367      typed_event_channel_ (ec),
00368      proxy_ (proxy),
00369      locked_ (0)
00370 {
00371   ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00372   // If the guard fails there is not much we can do, raising an
00373   // exception is wrong, the client has *no* way to handle that kind
00374   // of error.  Even worse, there is no exception to raise in that
00375   // case.
00376   // @@ Returning something won't work either, the error should be
00377   // logged though!
00378 
00379   if (proxy->is_connected_i () == 0)
00380     return;
00381 
00382   this->locked_ = 1;
00383   this->refcount_++;
00384 }
00385 
00386 TAO_CEC_TypedProxyPushConsumer_Guard::
00387     ~TAO_CEC_TypedProxyPushConsumer_Guard (void)
00388 {
00389   // This access is safe because guard objects are created on the
00390   // stack, only one thread has access to them
00391   if (!this->locked_)
00392     return;
00393 
00394   {
00395     ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00396     // If the guard fails there is not much we can do, raising an
00397     // exception is wrong, the client has *no* way to handle that kind
00398     // of error.  Even worse, there is no exception to raise in that
00399     // case.
00400     // @@ Returning something won't work either, the error should be
00401     // logged though!
00402 
00403     this->refcount_--;
00404     if (this->refcount_ != 0)
00405       return;
00406   }
00407   this->typed_event_channel_->destroy_proxy (this->proxy_);
00408 }
00409 
00410 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7