CEC_ProxyPullConsumer.cpp

Go to the documentation of this file.
00001 // $Id: CEC_ProxyPullConsumer.cpp 80714 2008-02-24 19:45:27Z johnnyw $
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00007 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00008 
00009 #include "ace/Reverse_Lock_T.h"
00010 
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 ACE_RCSID (CosEvent,
00016            CEC_ProxyPullConsumer,
00017            "$Id: CEC_ProxyPullConsumer.cpp 80714 2008-02-24 19:45:27Z johnnyw $")
00018 
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00022 
00023 TAO_CEC_ProxyPullConsumer::
00024 TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec,
00025                            const ACE_Time_Value &timeout)
00026   : event_channel_ (ec),
00027     timeout_ (timeout),
00028     refcount_ (1)
00029 {
00030   this->lock_ =
00031     this->event_channel_->create_consumer_lock ();
00032 
00033   this->default_POA_ =
00034     this->event_channel_->consumer_poa ();
00035 
00036   this->event_channel_->get_servant_retry_map ().bind (this, 0);
00037 }
00038 
00039 TAO_CEC_ProxyPullConsumer::~TAO_CEC_ProxyPullConsumer (void)
00040 {
00041   this->event_channel_->get_servant_retry_map ().unbind (this);
00042   this->event_channel_->destroy_consumer_lock (this->lock_);
00043 }
00044 
00045 void
00046 TAO_CEC_ProxyPullConsumer::activate (
00047     CosEventChannelAdmin::ProxyPullConsumer_ptr &activated_proxy)
00048 {
00049   CosEventChannelAdmin::ProxyPullConsumer_var result;
00050   try
00051     {
00052       result = this->_this ();
00053     }
00054   catch (const CORBA::Exception&)
00055     {
00056       result = CosEventChannelAdmin::ProxyPullConsumer::_nil ();
00057     }
00058   activated_proxy = result._retn ();
00059 }
00060 
00061 void
00062 TAO_CEC_ProxyPullConsumer::deactivate (void)
00063 {
00064   try
00065     {
00066       PortableServer::POA_var poa =
00067         this->_default_POA ();
00068       PortableServer::ObjectId_var id =
00069         poa->servant_to_id (this);
00070       poa->deactivate_object (id.in ());
00071     }
00072   catch (const CORBA::Exception&)
00073     {
00074       // Exceptions here should not be propagated.  They usually
00075       // indicate that an object is beign disconnected twice, or some
00076       // race condition, but not a fault that the user needs to know
00077       // about.
00078     }
00079 }
00080 
00081 // NOTE: There is some amount of duplicated code here, but it is
00082 // intentional. Mainly we want to avoid locking overhead when
00083 // possible, thus the code flow is optimized for that case more than
00084 // for small code.
00085 
00086 CORBA::Any*
00087 TAO_CEC_ProxyPullConsumer::try_pull_from_supplier (
00088     CORBA::Boolean_out has_event)
00089 {
00090   has_event = 0;
00091   CosEventComm::PullSupplier_var supplier;
00092   {
00093     ACE_GUARD_THROW_EX (
00094             ACE_Lock, ace_mon, *this->lock_,
00095             CORBA::INTERNAL ());
00096     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00097 
00098     if (this->is_connected_i () == 0)
00099       return 0; // ACE_THROW (CosEventComm::Disconnected ());????
00100 
00101     supplier =
00102       CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00103 
00104     // The refcount cannot be zero, because we have at least two
00105     // references,
00106   }
00107 
00108   CORBA::Any_var any;
00109   TAO_CEC_SupplierControl *control =
00110                  this->event_channel_->supplier_control ();
00111 
00112   try
00113     {
00114       any = supplier->try_pull (has_event);
00115 
00116       // Inform the control that we got something from the supplier
00117       control->successful_transmission(this);
00118     }
00119   catch (const CORBA::OBJECT_NOT_EXIST&)
00120     {
00121       control->supplier_not_exist (this);
00122     }
00123   catch (CORBA::SystemException& sysex)
00124     {
00125       control->system_exception (this, sysex);
00126     }
00127   catch (const CORBA::Exception&)
00128     {
00129       // @@ Should not happen
00130     }
00131   return any._retn ();
00132 }
00133 
00134 CORBA::Any*
00135 TAO_CEC_ProxyPullConsumer::pull_from_supplier ()
00136 {
00137   CosEventComm::PullSupplier_var supplier;
00138   {
00139     ACE_GUARD_THROW_EX (
00140             ACE_Lock, ace_mon, *this->lock_,
00141             CORBA::INTERNAL ());
00142     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00143 
00144     if (this->is_connected_i () == 0)
00145       return 0; // ACE_THROW (CosEventComm::Disconnected ());????
00146 
00147     supplier =
00148       CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00149 
00150     // The refcount cannot be zero, because we have at least two
00151     // references,
00152   }
00153 
00154   CORBA::Any_var any;
00155   try
00156     {
00157       any = supplier->pull ();
00158     }
00159   catch (const CORBA::Exception&)
00160     {
00161       // @@ This is where the policies for misbehaving suppliers
00162       //    should kick in.... for the moment just ignore them.
00163     }
00164   return any._retn ();
00165 }
00166 
00167 CORBA::Boolean
00168 TAO_CEC_ProxyPullConsumer::supplier_non_existent (
00169       CORBA::Boolean_out disconnected)
00170 {
00171   CORBA::Object_var supplier;
00172   {
00173     ACE_GUARD_THROW_EX (
00174         ACE_Lock, ace_mon, *this->lock_,
00175         CORBA::INTERNAL ());
00176 
00177     disconnected = 0;
00178     if (this->is_connected_i () == 0)
00179       {
00180         disconnected = 1;
00181         return 0;
00182       }
00183     if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
00184       {
00185         return 0;
00186       }
00187     supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
00188   }
00189 
00190 #if (TAO_HAS_MINIMUM_CORBA == 0)
00191   return supplier->_non_existent ();
00192 #else
00193   return 0;
00194 #endif /* TAO_HAS_MINIMUM_CORBA */
00195 }
00196 
00197 void
00198 TAO_CEC_ProxyPullConsumer::shutdown (void)
00199 {
00200   CosEventComm::PullSupplier_var supplier;
00201 
00202   {
00203     ACE_GUARD_THROW_EX (
00204         ACE_Lock, ace_mon, *this->lock_,
00205         CORBA::INTERNAL ());
00206     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00207 
00208     supplier = this->supplier_._retn ();
00209   }
00210 
00211   this->deactivate ();
00212 
00213   if (CORBA::is_nil (supplier.in ()))
00214     return;
00215 
00216   try
00217     {
00218       supplier->disconnect_pull_supplier ();
00219     }
00220   catch (const CORBA::Exception&)
00221     {
00222       // Ignore exceptions, we must isolate other clients from
00223       // failures on this one.
00224     }
00225 }
00226 
00227 void
00228 TAO_CEC_ProxyPullConsumer::cleanup_i (void)
00229 {
00230   this->supplier_ =
00231     CosEventComm::PullSupplier::_nil ();
00232 }
00233 
00234 CORBA::ULong
00235 TAO_CEC_ProxyPullConsumer::_incr_refcnt (void)
00236 {
00237   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00238   return this->refcount_++;
00239 }
00240 
00241 CORBA::ULong
00242 TAO_CEC_ProxyPullConsumer::_decr_refcnt (void)
00243 {
00244   {
00245     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00246     this->refcount_--;
00247     if (this->refcount_ != 0)
00248       return this->refcount_;
00249   }
00250 
00251   // Notify the event channel
00252   this->event_channel_->destroy_proxy (this);
00253   return 0;
00254 }
00255 
00256 void
00257 TAO_CEC_ProxyPullConsumer::connect_pull_supplier (
00258       CosEventComm::PullSupplier_ptr pull_supplier)
00259 {
00260   // Nil PullSuppliers are illegal
00261   if (CORBA::is_nil (pull_supplier))
00262     throw CORBA::BAD_PARAM ();
00263 
00264   {
00265     ACE_GUARD_THROW_EX (
00266         ACE_Lock, ace_mon, *this->lock_,
00267         CORBA::INTERNAL ());
00268     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00269 
00270     if (this->is_connected_i ())
00271       {
00272         if (this->event_channel_->supplier_reconnect () == 0)
00273           throw CosEventChannelAdmin::AlreadyConnected ();
00274 
00275         // Re-connections are allowed, go ahead and disconnect the
00276         // consumer...
00277         this->cleanup_i ();
00278 
00279         // @@ Please read the comments in CEC_ProxyPullSupplier about
00280         //     possible race conditions in this area...
00281         TAO_CEC_Unlock reverse_lock (*this->lock_);
00282 
00283         {
00284           ACE_GUARD_THROW_EX (
00285               TAO_CEC_Unlock, ace_mon, reverse_lock,
00286               CORBA::INTERNAL ());
00287           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00288 
00289           this->event_channel_->disconnected (this);
00290         }
00291 
00292         // What if a second thread connected us after this?
00293         if (this->is_connected_i ())
00294           return;
00295       }
00296     this->supplier_ = apply_policy (pull_supplier);
00297   }
00298 
00299   // Notify the event channel...
00300   this->event_channel_->connected (this);
00301 }
00302 
00303 CosEventComm::PullSupplier_ptr
00304 TAO_CEC_ProxyPullConsumer::apply_policy (CosEventComm::PullSupplier_ptr pre)
00305 {
00306   this->nopolicy_supplier_ = CosEventComm::PullSupplier::_duplicate (pre);
00307 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00308   CosEventComm::PullSupplier_var post =
00309     CosEventComm::PullSupplier::_duplicate (pre);
00310   if (this->timeout_ > ACE_Time_Value::zero)
00311     {
00312       CORBA::PolicyList policy_list;
00313       policy_list.length (1);
00314       policy_list[0] = this->event_channel_->
00315         create_roundtrip_timeout_policy (this->timeout_);
00316 
00317       CORBA::Object_var post_obj = pre->_set_policy_overrides
00318         (policy_list, CORBA::ADD_OVERRIDE);
00319       post = CosEventComm::PullSupplier::_narrow (post_obj.in ());
00320 
00321       policy_list[0]->destroy ();
00322       policy_list.length (0);
00323     }
00324   return post._retn ();
00325 #else
00326   return CosEventComm::PullSupplier::_duplicate (pre);
00327 #endif /* TAO_HAS_CORBA_MESSAGING */
00328 }
00329 
00330 void
00331 TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer ()
00332 {
00333   CosEventComm::PullSupplier_var supplier;
00334 
00335   {
00336     ACE_GUARD_THROW_EX (
00337         ACE_Lock, ace_mon, *this->lock_,
00338         CORBA::INTERNAL ());
00339     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00340 
00341     if (this->is_connected_i () == 0)
00342       throw CORBA::BAD_INV_ORDER (); // @@ add user exception?
00343 
00344     supplier = this->supplier_._retn ();
00345 
00346     this->cleanup_i ();
00347   }
00348 
00349   // Notify the event channel...
00350   this->event_channel_->disconnected (this);
00351 
00352   if (this->event_channel_->disconnect_callbacks ())
00353     {
00354       try
00355         {
00356           supplier->disconnect_pull_supplier ();
00357         }
00358       catch (const CORBA::Exception&)
00359         {
00360           // Ignore exceptions, we must isolate other clients from
00361           // failures on this one.
00362         }
00363     }
00364 }
00365 
00366 PortableServer::POA_ptr
00367 TAO_CEC_ProxyPullConsumer::_default_POA (void)
00368 {
00369   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00370 }
00371 
00372 void
00373 TAO_CEC_ProxyPullConsumer::_add_ref (void)
00374 {
00375   this->_incr_refcnt ();
00376 }
00377 
00378 void
00379 TAO_CEC_ProxyPullConsumer::_remove_ref (void)
00380 {
00381   this->_decr_refcnt ();
00382 }
00383 
00384 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7