EC_ProxySupplier.cpp

Go to the documentation of this file.
00001 // EC_ProxySupplier.cpp,v 1.69 2006/03/14 06:14:25 jtc Exp
00002 
00003 #include "orbsvcs/Event/EC_ProxySupplier.h"
00004 #include "orbsvcs/Event/EC_Dispatching.h"
00005 #include "orbsvcs/Event/EC_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_QOS_Info.h"
00007 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00008 #include "orbsvcs/Event/EC_Scheduling_Strategy.h"
00009 #include "orbsvcs/Event/EC_ConsumerControl.h"
00010 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00011 #include "orbsvcs/ESF/ESF_RefCount_Guard.h"
00012 #include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
00013 
00014 #include "ace/Reverse_Lock_T.h"
00015 
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/Event/EC_ProxySupplier.i"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID (Event,
00021            EC_ProxySupplier,
00022            "EC_ProxySupplier.cpp,v 1.69 2006/03/14 06:14:25 jtc Exp")
00023 
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00027 
00028 TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* ec, int validate_connection)
00029   : event_channel_ (ec),
00030     refcount_ (1),
00031     suspended_ (0),
00032     child_ (0),
00033     consumer_validate_connection_(validate_connection)
00034 {
00035   this->lock_ =
00036     this->event_channel_->create_supplier_lock ();
00037 
00038   this->default_POA_ =
00039     this->event_channel_->supplier_poa ();
00040 }
00041 
00042 TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
00043 {
00044   this->event_channel_->destroy_supplier_lock (this->lock_);
00045   this->cleanup_i ();
00046 }
00047 
00048 void
00049 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer* consumer
00050                                      ACE_ENV_ARG_DECL)
00051 {
00052   TAO_EC_Scheduling_Strategy *s =
00053     this->event_channel_->scheduling_strategy ();
00054 
00055   s->add_proxy_supplier_dependencies (this,
00056                                       consumer
00057                                        ACE_ENV_ARG_PARAMETER);
00058 }
00059 
00060 void
00061 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushConsumer* consumer
00062                                        ACE_ENV_ARG_DECL)
00063 {
00064   TAO_EC_Scheduling_Strategy *s =
00065     this->event_channel_->scheduling_strategy ();
00066 
00067   s->add_proxy_supplier_dependencies (this,
00068                                       consumer
00069                                        ACE_ENV_ARG_PARAMETER);
00070 }
00071 
00072 void
00073 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*
00074                                         ACE_ENV_ARG_DECL_NOT_USED)
00075 {
00076 }
00077 
00078 void
00079 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*
00080                                      ACE_ENV_ARG_DECL_NOT_USED)
00081 {
00082 }
00083 
00084 void
00085 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushSupplier*
00086                                      ACE_ENV_ARG_DECL_NOT_USED)
00087 {
00088 }
00089 
00090 void
00091 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*
00092                                         ACE_ENV_ARG_DECL_NOT_USED)
00093 {
00094 }
00095 
00096 void
00097 TAO_EC_ProxyPushSupplier::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00098 {
00099   // Save the consumer we where connected to, we need to send a
00100   // disconnect message to it.
00101   RtecEventComm::PushConsumer_var consumer;
00102 
00103   {
00104     ACE_GUARD_THROW_EX (
00105         ACE_Lock, ace_mon, *this->lock_,
00106         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00107     ACE_CHECK;
00108 
00109     int connected = this->is_connected_i ();
00110 
00111     consumer = this->consumer_._retn ();
00112 
00113     if (connected)
00114       this->cleanup_i ();
00115   }
00116 
00117   this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
00118   ACE_CHECK;
00119 
00120   if (CORBA::is_nil (consumer.in ()))
00121     return;
00122 
00123   ACE_TRY
00124     {
00125       consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00126       ACE_TRY_CHECK;
00127     }
00128   ACE_CATCHANY
00129     {
00130       // Ignore exceptions, we must isolate other clients from
00131       // problems on this one.
00132     }
00133   ACE_ENDTRY;
00134 }
00135 
00136 void
00137 TAO_EC_ProxyPushSupplier::cleanup_i (void)
00138 {
00139   this->consumer_ =
00140     RtecEventComm::PushConsumer::_nil ();
00141 
00142   // @@ Why don't we have a destroy() method in the
00143   // filter_builder?
00144   delete this->child_;
00145   this->child_ = 0;
00146 }
00147 
00148 void
00149 TAO_EC_ProxyPushSupplier::deactivate (ACE_ENV_SINGLE_ARG_DECL) ACE_THROW_SPEC (())
00150 {
00151   ACE_TRY
00152     {
00153       PortableServer::ObjectId id =
00154         this->object_id (ACE_ENV_SINGLE_ARG_PARAMETER);
00155       ACE_TRY_CHECK;
00156       this->default_POA_->deactivate_object (id ACE_ENV_ARG_PARAMETER);
00157       ACE_TRY_CHECK;
00158     }
00159   ACE_CATCHANY
00160     {
00161       // Exceptions here should not be propagated.  They usually
00162       // indicate that an object is beign disconnected twice, or some
00163       // race condition, but not a fault that the user needs to know
00164       // about.
00165     }
00166   ACE_ENDTRY;
00167 }
00168 
00169 CORBA::ULong
00170 TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
00171 {
00172   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00173   return this->refcount_++;
00174 }
00175 
00176 void
00177 TAO_EC_ProxyPushSupplier::refcount_zero_hook (void)
00178 {
00179   // Use the event channel
00180   this->event_channel_->destroy_proxy (this);
00181 }
00182 
00183 CORBA::ULong
00184 TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
00185 {
00186   {
00187     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00188     this->refcount_--;
00189     if (this->refcount_ != 0)
00190       return this->refcount_;
00191   }
00192 
00193   this->refcount_zero_hook ();
00194   return 0;
00195 }
00196 
00197 typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel_Base,TAO_EC_ProxyPushSupplier> Destroy_Guard;
00198 
00199 int
00200 TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
00201                                   TAO_EC_QOS_Info& qos_info
00202                                   ACE_ENV_ARG_DECL)
00203 {
00204   Destroy_Guard auto_destroy (this->refcount_,
00205                               this->event_channel_,
00206                               this);
00207 
00208   int result = 0;
00209   {
00210     ACE_GUARD_THROW_EX (
00211             ACE_Lock, ace_mon, *this->lock_,
00212             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00213     ACE_CHECK_RETURN (0);
00214 
00215     if (this->is_connected_i () == 0)
00216       return 0;
00217 
00218     result =
00219       this->child_->filter (event, qos_info ACE_ENV_ARG_PARAMETER);
00220     ACE_CHECK_RETURN (0);
00221   }
00222   return result;
00223 }
00224 
00225 int
00226 TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
00227                                          TAO_EC_QOS_Info& qos_info
00228                                          ACE_ENV_ARG_DECL)
00229 {
00230   Destroy_Guard auto_destroy (this->refcount_,
00231                               this->event_channel_,
00232                               this);
00233 
00234   int result = 0;
00235   {
00236     ACE_GUARD_THROW_EX (
00237             ACE_Lock, ace_mon, *this->lock_,
00238             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00239     ACE_CHECK_RETURN (0);
00240 
00241     if (this->is_connected_i () == 0)
00242       return 0;
00243 
00244     result =
00245       this->child_->filter_nocopy (event, qos_info ACE_ENV_ARG_PARAMETER);
00246     ACE_CHECK_RETURN (0);
00247   }
00248   return result;
00249 }
00250 
00251 void
00252 TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
00253                                 TAO_EC_QOS_Info& qos_info
00254                                 ACE_ENV_ARG_DECL)
00255 {
00256   // The mutex is already held by the caller (usually the filter()
00257   // method)
00258   if (this->is_connected_i () == 0)
00259     return; // TAO_THROW (RtecEventComm::Disconnected ());????
00260 
00261   if (this->suspended_ != 0)
00262     return;
00263 
00264   TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00265   // The guard will decrement the reference count, notice that the
00266   // reference count can become 0, but this is not the right spot to
00267   // check for that and destroy the object.
00268   // If we did so then we would destroy the object, and consequently
00269   // the mutex, but the mutex is used later when the stack unwinds and
00270   // the filter() method tries to destroy the mutex (that originally
00271   // acquired the mutex in the first place).
00272   // So the correct thing to do is to just decrement the reference
00273   // count and let the filter() method do the destruction.
00274 
00275   RtecEventComm::PushConsumer_var consumer =
00276     RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00277 
00278   this->pre_dispatch_hook (const_cast<RtecEventComm::EventSet&> (event)
00279                            ACE_ENV_ARG_PARAMETER);
00280   ACE_CHECK;
00281 
00282   {
00283     // We have to release the lock to avoid dead-locks.
00284     TAO_EC_Unlock reverse_lock (*this->lock_);
00285 
00286     ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00287                         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00288     ACE_CHECK;
00289 
00290     this->event_channel_->dispatching ()->push (this,
00291                                                 consumer.in (),
00292                                                 event,
00293                                                 qos_info
00294                                                  ACE_ENV_ARG_PARAMETER);
00295     ACE_CHECK;
00296   }
00297 
00298   if (this->child_ != 0)
00299     this->child_->clear ();
00300 }
00301 
00302 void
00303 TAO_EC_ProxyPushSupplier::pre_dispatch_hook (RtecEventComm::EventSet&
00304                                              ACE_ENV_ARG_DECL_NOT_USED)
00305 {
00306 }
00307 
00308 void
00309 TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
00310                                        TAO_EC_QOS_Info& qos_info
00311                                        ACE_ENV_ARG_DECL)
00312 {
00313   // The mutex is already held by the caller (usually the filter()
00314   // method)
00315   if (this->is_connected_i () == 0)
00316     return; // TAO_THROW (RtecEventComm::Disconnected ());????
00317 
00318   if (this->suspended_ != 0)
00319     return;
00320 
00321   TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00322   // The guard will decrement the reference count, notice that the
00323   // reference count can become 0, but this is not the right spot to
00324   // check for that and destroy the object.
00325   // If we did so then we would destroy the object, and consequently
00326   // the mutex, but the mutex is used later when the stack unwinds and
00327   // the filter() method tries to destroy the mutex (that originally
00328   // acquired the mutex in the first place).
00329   // So the correct thing to do is to just decrement the reference
00330   // count and let the filter() method do the destruction.
00331 
00332   RtecEventComm::PushConsumer_var consumer =
00333     RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00334 
00335   this->pre_dispatch_hook (event ACE_ENV_ARG_PARAMETER);
00336   ACE_CHECK;
00337 
00338   {
00339     TAO_EC_Unlock reverse_lock (*this->lock_);
00340 
00341     ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00342                         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00343     ACE_CHECK;
00344 
00345     this->event_channel_->dispatching ()->push_nocopy (this,
00346                                                        consumer.in (),
00347                                                        event,
00348                                                        qos_info
00349                                                         ACE_ENV_ARG_PARAMETER);
00350     ACE_CHECK;
00351   }
00352 
00353   if (this->child_ != 0)
00354     this->child_->clear ();
00355 }
00356 
00357 void
00358 TAO_EC_ProxyPushSupplier::push_to_consumer (
00359     RtecEventComm::PushConsumer_ptr consumer,
00360     const RtecEventComm::EventSet& event
00361     ACE_ENV_ARG_DECL)
00362 {
00363   {
00364     ACE_GUARD_THROW_EX (
00365             ACE_Lock, ace_mon, *this->lock_,
00366             RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00367     ACE_CHECK;
00368 
00369     if (this->is_connected_i () == 0)
00370       return; // ACE_THROW (RtecEventComm::Disconnected ());????
00371 
00372     if (this->suspended_ != 0)
00373       return;
00374   }
00375 
00376   ACE_TRY
00377     {
00378       consumer->push (event ACE_ENV_ARG_PARAMETER);
00379       ACE_TRY_CHECK;
00380     }
00381   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
00382     {
00383       // Do not report errors for old consumers
00384       // NOTE: The comparison below is not completely correct, it
00385       // could be that the remote consumer and the local consumer are
00386       // in fact the same object, but represented by different objects
00387       // references.  Unfortunately this is not a good spot to invoke
00388       // _is_equivalent(), and that may not give us the desired answer
00389       // anyway.
00390       if (consumer == this->consumer_.in ())
00391         {
00392           TAO_EC_ConsumerControl *control =
00393             this->event_channel_->consumer_control ();
00394 
00395           control->consumer_not_exist (this ACE_ENV_ARG_PARAMETER);
00396           ACE_TRY_CHECK;
00397         }
00398     }
00399   ACE_CATCH (CORBA::SystemException, sysex)
00400     {
00401       // Do not report errors for old consumers
00402       // NOTE: The comparison below is not completely correct, it
00403       // could be that the remote consumer and the local consumer are
00404       // in fact the same object, but represented by different objects
00405       // references.  Unfortunately this is not a good spot to invoke
00406       // _is_equivalent(), and that may not give us the desired answer
00407       // anyway.
00408       if (consumer == this->consumer_.in ())
00409         {
00410           TAO_EC_ConsumerControl *control =
00411             this->event_channel_->consumer_control ();
00412 
00413           control->system_exception (this,
00414                                      sysex
00415                                       ACE_ENV_ARG_PARAMETER);
00416           ACE_TRY_CHECK;
00417         }
00418     }
00419   ACE_CATCHANY
00420     {
00421       // Shouldn't happen, but does not hurt
00422     }
00423   ACE_ENDTRY;
00424 }
00425 
00426 void
00427 TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
00428     RtecEventComm::PushConsumer_ptr consumer,
00429     const RtecEventComm::EventSet& event
00430     ACE_ENV_ARG_DECL)
00431 {
00432   ACE_TRY
00433     {
00434       consumer->push (event ACE_ENV_ARG_PARAMETER);
00435       ACE_TRY_CHECK;
00436     }
00437   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
00438     {
00439       TAO_EC_ConsumerControl *control =
00440         this->event_channel_->consumer_control ();
00441 
00442       control->consumer_not_exist (this ACE_ENV_ARG_PARAMETER);
00443       ACE_CHECK;
00444     }
00445   ACE_CATCH (CORBA::SystemException, sysex)
00446     {
00447       TAO_EC_ConsumerControl *control =
00448         this->event_channel_->consumer_control ();
00449 
00450       control->system_exception (this,
00451                                  sysex
00452                                   ACE_ENV_ARG_PARAMETER);
00453       ACE_CHECK;
00454     }
00455   ACE_CATCHANY
00456     {
00457       // Shouldn't happen
00458     }
00459   ACE_ENDTRY;
00460 }
00461 
00462 CORBA::Boolean
00463 TAO_EC_ProxyPushSupplier::consumer_non_existent (
00464       CORBA::Boolean_out disconnected
00465       ACE_ENV_ARG_DECL)
00466 {
00467   CORBA::Object_var consumer;
00468   {
00469     ACE_GUARD_THROW_EX (
00470         ACE_Lock, ace_mon, *this->lock_,
00471         CORBA::INTERNAL ());
00472     ACE_CHECK_RETURN (0);
00473 
00474     disconnected = 0;
00475     if (this->is_connected_i () == 0)
00476       {
00477         disconnected = 1;
00478         return 0;
00479       }
00480 
00481     consumer = CORBA::Object::_duplicate (this->consumer_.in ());
00482   }
00483 
00484 #if (TAO_HAS_MINIMUM_CORBA == 0)
00485   return consumer->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00486 #else
00487   return 0;
00488 #endif /* TAO_HAS_MINIMUM_CORBA */
00489 }
00490 
00491 void
00492 TAO_EC_ProxyPushSupplier::clear (void)
00493 {
00494   ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
00495 
00496   this->child_->clear ();
00497 }
00498 
00499 CORBA::ULong
00500 TAO_EC_ProxyPushSupplier::max_event_size (void) const
00501 {
00502   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00503 
00504   return this->child_->max_event_size ();
00505 }
00506 
00507 int
00508 TAO_EC_ProxyPushSupplier::can_match (
00509       const RtecEventComm::EventHeader &header) const
00510 {
00511   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00512 
00513   if (this->is_connected_i () == 0)
00514     return 0;
00515 
00516   return this->child_->can_match (header);
00517 }
00518 
00519 int
00520 TAO_EC_ProxyPushSupplier::add_dependencies (
00521       const RtecEventComm::EventHeader &header,
00522       const TAO_EC_QOS_Info &qos_info
00523       ACE_ENV_ARG_DECL)
00524 {
00525   ACE_GUARD_THROW_EX (
00526           ACE_Lock, ace_mon, *this->lock_,
00527           RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00528   ACE_CHECK_RETURN (0);
00529 
00530   return this->child_->add_dependencies (header,
00531                                          qos_info
00532                                          ACE_ENV_ARG_PARAMETER);
00533 }
00534 
00535 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:11:09 2006 for TAO_RTEvent by doxygen 1.3.6