EC_ProxySupplier.cpp

Go to the documentation of this file.
00001 // $Id: EC_ProxySupplier.cpp 77052 2007-02-12 18:59:05Z johnnyw $
00002 
00003 #include "orbsvcs/Event/EC_ProxySupplier.h"
00004 #include "orbsvcs/Event/EC_Dispatching.h"
00005 #include "orbsvcs/Event/EC_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_QOS_Info.h"
00007 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00008 #include "orbsvcs/Event/EC_Scheduling_Strategy.h"
00009 #include "orbsvcs/Event/EC_ConsumerControl.h"
00010 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00011 #include "orbsvcs/ESF/ESF_RefCount_Guard.h"
00012 #include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
00013 
00014 #include "ace/Reverse_Lock_T.h"
00015 
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/Event/EC_ProxySupplier.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID (Event,
00021            EC_ProxySupplier,
00022            "$Id: EC_ProxySupplier.cpp 77052 2007-02-12 18:59:05Z johnnyw $")
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00027 
00028 TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* ec, int validate_connection)
00029   : event_channel_ (ec),
00030     refcount_ (1),
00031     suspended_ (0),
00032     child_ (0),
00033     consumer_validate_connection_(validate_connection)
00034 {
00035   this->lock_ =
00036     this->event_channel_->create_supplier_lock ();
00037 
00038   this->default_POA_ =
00039     this->event_channel_->supplier_poa ();
00040 }
00041 
00042 TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
00043 {
00044   this->event_channel_->destroy_supplier_lock (this->lock_);
00045   this->cleanup_i ();
00046 }
00047 
00048 void
00049 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer* consumer)
00050 {
00051   TAO_EC_Scheduling_Strategy *s =
00052     this->event_channel_->scheduling_strategy ();
00053 
00054   s->add_proxy_supplier_dependencies (this,
00055                                       consumer);
00056 }
00057 
00058 void
00059 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushConsumer* consumer)
00060 {
00061   TAO_EC_Scheduling_Strategy *s =
00062     this->event_channel_->scheduling_strategy ();
00063 
00064   s->add_proxy_supplier_dependencies (this,
00065                                       consumer);
00066 }
00067 
00068 void
00069 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*)
00070 {
00071 }
00072 
00073 void
00074 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*)
00075 {
00076 }
00077 
00078 void
00079 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushSupplier*)
00080 {
00081 }
00082 
00083 void
00084 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*)
00085 {
00086 }
00087 
00088 void
00089 TAO_EC_ProxyPushSupplier::shutdown (void)
00090 {
00091   // Save the consumer we where connected to, we need to send a
00092   // disconnect message to it.
00093   RtecEventComm::PushConsumer_var consumer;
00094 
00095   {
00096     ACE_GUARD_THROW_EX (
00097         ACE_Lock, ace_mon, *this->lock_,
00098         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00099 
00100     int connected = this->is_connected_i ();
00101 
00102     consumer = this->consumer_._retn ();
00103 
00104     if (connected)
00105       this->cleanup_i ();
00106   }
00107 
00108   this->deactivate ();
00109 
00110   if (CORBA::is_nil (consumer.in ()))
00111     return;
00112 
00113   try
00114     {
00115       consumer->disconnect_push_consumer ();
00116     }
00117   catch (const CORBA::Exception&)
00118     {
00119       // Ignore exceptions, we must isolate other clients from
00120       // problems on this one.
00121     }
00122 }
00123 
00124 void
00125 TAO_EC_ProxyPushSupplier::cleanup_i (void)
00126 {
00127   this->consumer_ =
00128     RtecEventComm::PushConsumer::_nil ();
00129 
00130   // @@ Why don't we have a destroy() method in the
00131   // filter_builder?
00132   delete this->child_;
00133   this->child_ = 0;
00134 }
00135 
00136 void
00137 TAO_EC_ProxyPushSupplier::deactivate (void) throw ()
00138 {
00139   try
00140     {
00141       PortableServer::ObjectId id = this->object_id ();
00142       this->default_POA_->deactivate_object (id);
00143     }
00144   catch (const CORBA::Exception&)
00145     {
00146       // Exceptions here should not be propagated.  They usually
00147       // indicate that an object is beign disconnected twice, or some
00148       // race condition, but not a fault that the user needs to know
00149       // about.
00150     }
00151 }
00152 
00153 CORBA::ULong
00154 TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
00155 {
00156   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00157   return this->refcount_++;
00158 }
00159 
00160 void
00161 TAO_EC_ProxyPushSupplier::refcount_zero_hook (void)
00162 {
00163   // Use the event channel
00164   this->event_channel_->destroy_proxy (this);
00165 }
00166 
00167 CORBA::ULong
00168 TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
00169 {
00170   {
00171     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00172     --this->refcount_;
00173     if (this->refcount_ != 0)
00174       return this->refcount_;
00175   }
00176 
00177   this->refcount_zero_hook ();
00178   return 0;
00179 }
00180 
00181 typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel_Base,TAO_EC_ProxyPushSupplier> Destroy_Guard;
00182 
00183 int
00184 TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
00185                                   TAO_EC_QOS_Info& qos_info)
00186 {
00187   Destroy_Guard auto_destroy (this->refcount_,
00188                               this->event_channel_,
00189                               this);
00190 
00191   int result = 0;
00192   {
00193     ACE_GUARD_THROW_EX (
00194             ACE_Lock, ace_mon, *this->lock_,
00195             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00196 
00197     if (this->is_connected_i () == 0)
00198       return 0;
00199 
00200     result = this->child_->filter (event, qos_info);
00201   }
00202   return result;
00203 }
00204 
00205 int
00206 TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
00207                                          TAO_EC_QOS_Info& qos_info)
00208 {
00209   Destroy_Guard auto_destroy (this->refcount_,
00210                               this->event_channel_,
00211                               this);
00212 
00213   int result = 0;
00214   {
00215     ACE_GUARD_THROW_EX (
00216             ACE_Lock, ace_mon, *this->lock_,
00217             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00218 
00219     if (this->is_connected_i () == 0)
00220       return 0;
00221 
00222     result = this->child_->filter_nocopy (event, qos_info);
00223   }
00224   return result;
00225 }
00226 
00227 void
00228 TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
00229                                 TAO_EC_QOS_Info& qos_info)
00230 {
00231   // The mutex is already held by the caller (usually the filter()
00232   // method)
00233   if (this->is_connected_i () == 0)
00234     return; // TAO_THROW (RtecEventComm::Disconnected ());????
00235 
00236   if (this->suspended_ != 0)
00237     return;
00238 
00239   TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00240   // The guard will decrement the reference count, notice that the
00241   // reference count can become 0, but this is not the right spot to
00242   // check for that and destroy the object.
00243   // If we did so then we would destroy the object, and consequently
00244   // the mutex, but the mutex is used later when the stack unwinds and
00245   // the filter() method tries to destroy the mutex (that originally
00246   // acquired the mutex in the first place).
00247   // So the correct thing to do is to just decrement the reference
00248   // count and let the filter() method do the destruction.
00249 
00250   RtecEventComm::PushConsumer_var consumer =
00251     RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00252 
00253   this->pre_dispatch_hook (const_cast<RtecEventComm::EventSet&> (event));
00254 
00255   {
00256     // We have to release the lock to avoid dead-locks.
00257     TAO_EC_Unlock reverse_lock (*this->lock_);
00258 
00259     ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00260                         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00261 
00262     this->event_channel_->dispatching ()->push (this,
00263                                                 consumer.in (),
00264                                                 event,
00265                                                 qos_info);
00266   }
00267 
00268   if (this->child_ != 0)
00269     this->child_->clear ();
00270 }
00271 
00272 void
00273 TAO_EC_ProxyPushSupplier::pre_dispatch_hook (RtecEventComm::EventSet&)
00274 {
00275 }
00276 
00277 void
00278 TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
00279                                        TAO_EC_QOS_Info& qos_info)
00280 {
00281   // The mutex is already held by the caller (usually the filter()
00282   // method)
00283   if (this->is_connected_i () == 0)
00284     return; // TAO_THROW (RtecEventComm::Disconnected ());????
00285 
00286   if (this->suspended_ != 0)
00287     return;
00288 
00289   TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00290   // The guard will decrement the reference count, notice that the
00291   // reference count can become 0, but this is not the right spot to
00292   // check for that and destroy the object.
00293   // If we did so then we would destroy the object, and consequently
00294   // the mutex, but the mutex is used later when the stack unwinds and
00295   // the filter() method tries to destroy the mutex (that originally
00296   // acquired the mutex in the first place).
00297   // So the correct thing to do is to just decrement the reference
00298   // count and let the filter() method do the destruction.
00299 
00300   RtecEventComm::PushConsumer_var consumer =
00301     RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00302 
00303   this->pre_dispatch_hook (event);
00304 
00305   {
00306     TAO_EC_Unlock reverse_lock (*this->lock_);
00307 
00308     ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00309                         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00310 
00311     this->event_channel_->dispatching ()->push_nocopy (this,
00312                                                        consumer.in (),
00313                                                        event,
00314                                                        qos_info);
00315   }
00316 
00317   if (this->child_ != 0)
00318     this->child_->clear ();
00319 }
00320 
00321 void
00322 TAO_EC_ProxyPushSupplier::push_to_consumer (
00323     RtecEventComm::PushConsumer_ptr consumer,
00324     const RtecEventComm::EventSet& event)
00325 {
00326   {
00327     ACE_GUARD_THROW_EX (
00328             ACE_Lock, ace_mon, *this->lock_,
00329             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00330 
00331     if (this->is_connected_i () == 0)
00332       return; // ACE_THROW (RtecEventComm::Disconnected ());????
00333 
00334     if (this->suspended_ != 0)
00335       return;
00336   }
00337 
00338   try
00339     {
00340       consumer->push (event);
00341     }
00342   catch (const CORBA::OBJECT_NOT_EXIST&)
00343     {
00344       // Do not report errors for old consumers
00345       // NOTE: The comparison below is not completely correct, it
00346       // could be that the remote consumer and the local consumer are
00347       // in fact the same object, but represented by different objects
00348       // references.  Unfortunately this is not a good spot to invoke
00349       // _is_equivalent(), and that may not give us the desired answer
00350       // anyway.
00351       if (consumer == this->consumer_.in ())
00352         {
00353           TAO_EC_ConsumerControl *control =
00354             this->event_channel_->consumer_control ();
00355 
00356           control->consumer_not_exist (this);
00357         }
00358     }
00359   catch (CORBA::SystemException& sysex)
00360     {
00361       // Do not report errors for old consumers
00362       // NOTE: The comparison below is not completely correct, it
00363       // could be that the remote consumer and the local consumer are
00364       // in fact the same object, but represented by different objects
00365       // references.  Unfortunately this is not a good spot to invoke
00366       // _is_equivalent(), and that may not give us the desired answer
00367       // anyway.
00368       if (consumer == this->consumer_.in ())
00369         {
00370           TAO_EC_ConsumerControl *control =
00371             this->event_channel_->consumer_control ();
00372 
00373           control->system_exception (this,
00374                                      sysex);
00375         }
00376     }
00377   catch (const CORBA::Exception&)
00378     {
00379       // Shouldn't happen, but does not hurt
00380     }
00381 }
00382 
00383 void
00384 TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
00385     RtecEventComm::PushConsumer_ptr consumer,
00386     const RtecEventComm::EventSet& event)
00387 {
00388   try
00389     {
00390       consumer->push (event);
00391     }
00392   catch (const CORBA::OBJECT_NOT_EXIST&)
00393     {
00394       TAO_EC_ConsumerControl *control =
00395         this->event_channel_->consumer_control ();
00396 
00397       control->consumer_not_exist (this);
00398     }
00399   catch (CORBA::SystemException& sysex)
00400     {
00401       TAO_EC_ConsumerControl *control =
00402         this->event_channel_->consumer_control ();
00403 
00404       control->system_exception (this,
00405                                  sysex);
00406     }
00407   catch (const CORBA::Exception&)
00408     {
00409       // Shouldn't happen
00410     }
00411 }
00412 
00413 CORBA::Boolean
00414 TAO_EC_ProxyPushSupplier::consumer_non_existent (
00415       CORBA::Boolean_out disconnected)
00416 {
00417   CORBA::Object_var consumer;
00418   {
00419     ACE_GUARD_THROW_EX (
00420         ACE_Lock, ace_mon, *this->lock_,
00421         CORBA::INTERNAL ());
00422 
00423     disconnected = 0;
00424     if (this->is_connected_i () == 0)
00425       {
00426         disconnected = 1;
00427         return 0;
00428       }
00429 
00430     consumer = CORBA::Object::_duplicate (this->consumer_.in ());
00431   }
00432 
00433 #if (TAO_HAS_MINIMUM_CORBA == 0)
00434   return consumer->_non_existent ();
00435 #else
00436   return 0;
00437 #endif /* TAO_HAS_MINIMUM_CORBA */
00438 }
00439 
00440 void
00441 TAO_EC_ProxyPushSupplier::clear (void)
00442 {
00443   ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
00444 
00445   this->child_->clear ();
00446 }
00447 
00448 CORBA::ULong
00449 TAO_EC_ProxyPushSupplier::max_event_size (void) const
00450 {
00451   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00452 
00453   return this->child_->max_event_size ();
00454 }
00455 
00456 int
00457 TAO_EC_ProxyPushSupplier::can_match (
00458       const RtecEventComm::EventHeader &header) const
00459 {
00460   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00461 
00462   if (this->is_connected_i () == 0)
00463     return 0;
00464 
00465   return this->child_->can_match (header);
00466 }
00467 
00468 int
00469 TAO_EC_ProxyPushSupplier::add_dependencies (
00470       const RtecEventComm::EventHeader &header,
00471       const TAO_EC_QOS_Info &qos_info)
00472 {
00473   ACE_GUARD_THROW_EX (
00474           ACE_Lock, ace_mon, *this->lock_,
00475           RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00476 
00477   return this->child_->add_dependencies (header,
00478                                          qos_info);
00479 }
00480 
00481 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:05 2010 for TAO_RTEvent by  doxygen 1.4.7