CEC_ProxyPullSupplier.cpp

Go to the documentation of this file.
00001 // $Id: CEC_ProxyPullSupplier.cpp 77001 2007-02-12 07:54:49Z johnnyw $
00002 
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00004 #include "orbsvcs/CosEvent/CEC_Dispatching.h"
00005 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00006 
00007 #include "ace/Reverse_Lock_T.h"
00008 
00009 #if ! defined (__ACE_INLINE__)
00010 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.inl"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 ACE_RCSID (CosEvent,
00014            CEC_ProxyPullSupplier,
00015            "$Id: CEC_ProxyPullSupplier.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00016 
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00020 
00021 TAO_CEC_ProxyPullSupplier::TAO_CEC_ProxyPullSupplier
00022 (TAO_CEC_EventChannel* ec, const ACE_Time_Value &timeout)
00023   : event_channel_ (ec),
00024     timeout_ (timeout),
00025     refcount_ (1),
00026     connected_ (0),
00027     wait_not_empty_ (queue_lock_)
00028 {
00029   this->lock_ =
00030     this->event_channel_->create_supplier_lock ();
00031 
00032   this->default_POA_ =
00033     this->event_channel_->supplier_poa ();
00034 
00035   this->event_channel_->get_servant_retry_map ().bind (this, 0);
00036 }
00037 
00038 TAO_CEC_ProxyPullSupplier::~TAO_CEC_ProxyPullSupplier (void)
00039 {
00040   this->event_channel_->get_servant_retry_map ().unbind (this);
00041   this->event_channel_->destroy_supplier_lock (this->lock_);
00042 }
00043 
00044 void
00045 TAO_CEC_ProxyPullSupplier::activate (
00046     CosEventChannelAdmin::ProxyPullSupplier_ptr &activated_proxy)
00047 {
00048   CosEventChannelAdmin::ProxyPullSupplier_var result;
00049   try
00050     {
00051       result = this->_this ();
00052     }
00053   catch (const CORBA::Exception&)
00054     {
00055       result = CosEventChannelAdmin::ProxyPullSupplier::_nil ();
00056     }
00057   activated_proxy = result._retn ();
00058 }
00059 
00060 void
00061 TAO_CEC_ProxyPullSupplier::deactivate (void)
00062 {
00063   try
00064     {
00065       PortableServer::POA_var poa =
00066         this->_default_POA ();
00067       PortableServer::ObjectId_var id =
00068         poa->servant_to_id (this);
00069       poa->deactivate_object (id.in ());
00070     }
00071   catch (const CORBA::Exception&)
00072     {
00073       // Exceptions here should not be propagated.  They usually
00074       // indicate that an object is beign disconnected twice, or some
00075       // race condition, but not a fault that the user needs to know
00076       // about.
00077     }
00078 }
00079 
00080 void
00081 TAO_CEC_ProxyPullSupplier::shutdown (void)
00082 {
00083   // Save the consumer we where connected to, we need to send a
00084   // disconnect message to it.
00085   CosEventComm::PullConsumer_var consumer;
00086 
00087   {
00088     ACE_GUARD_THROW_EX (
00089         ACE_Lock, ace_mon, *this->lock_,
00090         CORBA::INTERNAL ());
00091     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00092 
00093     consumer = this->consumer_._retn ();
00094   }
00095 
00096   this->deactivate ();
00097 
00098   if (CORBA::is_nil (consumer.in ()))
00099     return;
00100 
00101   try
00102     {
00103       consumer->disconnect_pull_consumer ();
00104     }
00105   catch (const CORBA::Exception&)
00106     {
00107       // Ignore exceptions, we must isolate other clients from
00108       // problems on this one.
00109     }
00110 }
00111 
00112 CORBA::Boolean
00113 TAO_CEC_ProxyPullSupplier::consumer_non_existent (
00114       CORBA::Boolean_out disconnected)
00115 {
00116   CORBA::Object_var consumer;
00117   {
00118     ACE_GUARD_THROW_EX (
00119         ACE_Lock, ace_mon, *this->lock_,
00120         CORBA::INTERNAL ());
00121 
00122     disconnected = 0;
00123     if (this->is_connected_i () == 0)
00124       {
00125         disconnected = 1;
00126         return 0;
00127       }
00128     if (CORBA::is_nil (this->nopolicy_consumer_.in ()))
00129       {
00130         return 0;
00131       }
00132     consumer = CORBA::Object::_duplicate (this->nopolicy_consumer_.in ());
00133   }
00134 
00135 #if (TAO_HAS_MINIMUM_CORBA == 0)
00136   return consumer->_non_existent ();
00137 #else
00138   return 0;
00139 #endif /* TAO_HAS_MINIMUM_CORBA */
00140 }
00141 
00142 void
00143 TAO_CEC_ProxyPullSupplier::push (const CORBA::Any &event)
00144 {
00145   if (this->is_connected () == 0)
00146     return;
00147 
00148   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00149 
00150   // Ignore errors
00151   (void) this->queue_.enqueue_tail (event);
00152 
00153   this->wait_not_empty_.signal ();
00154 }
00155 
00156 CORBA::Any *
00157 TAO_CEC_ProxyPullSupplier::pull (void)
00158 {
00159   if (this->is_connected () == 0)
00160     throw CosEventComm::Disconnected ();
00161 
00162   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00163   while (this->queue_.is_empty ())
00164     {
00165       this->wait_not_empty_.wait ();
00166     }
00167   CORBA::Any any;
00168   if (this->queue_.dequeue_head (any) != 0)
00169     {
00170       throw CORBA::INTERNAL ();
00171     }
00172   return new CORBA::Any (any);
00173 }
00174 
00175 CORBA::Any *
00176 TAO_CEC_ProxyPullSupplier::try_pull (CORBA::Boolean_out has_event)
00177 {
00178   has_event = 0;
00179   if (this->is_connected () == 0)
00180     throw CosEventComm::Disconnected ();
00181 
00182   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00183   CORBA::Any any;
00184   if (this->queue_.is_empty ())
00185     {
00186       any <<= CORBA::Long(0);
00187       return new CORBA::Any (any);
00188     }
00189 
00190   if (this->queue_.dequeue_head (any) != 0)
00191     {
00192       throw CORBA::INTERNAL ();
00193     }
00194   has_event = 1;
00195   return new CORBA::Any (any);
00196 }
00197 
00198 void
00199 TAO_CEC_ProxyPullSupplier::cleanup_i (void)
00200 {
00201   this->consumer_ =
00202     CosEventComm::PullConsumer::_nil ();
00203   this->connected_ = 0;
00204 }
00205 
00206 CORBA::ULong
00207 TAO_CEC_ProxyPullSupplier::_incr_refcnt (void)
00208 {
00209   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00210   return this->refcount_++;
00211 }
00212 
00213 CORBA::ULong
00214 TAO_CEC_ProxyPullSupplier::_decr_refcnt (void)
00215 {
00216   {
00217     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00218     this->refcount_--;
00219     if (this->refcount_ != 0)
00220       return this->refcount_;
00221   }
00222 
00223   // Notify the event channel
00224   this->event_channel_->destroy_proxy (this);
00225   return 0;
00226 }
00227 
00228 void
00229 TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
00230       CosEventComm::PullConsumer_ptr pull_consumer)
00231 {
00232   {
00233     ACE_GUARD_THROW_EX (
00234         ACE_Lock, ace_mon, *this->lock_,
00235         CORBA::INTERNAL ());
00236     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00237 
00238     if (this->is_connected_i ())
00239       {
00240         if (this->event_channel_->consumer_reconnect () == 0)
00241           throw CosEventChannelAdmin::AlreadyConnected ();
00242 
00243         // Re-connections are allowed....
00244         this->cleanup_i ();
00245 
00246         this->consumer_ = apply_policy (pull_consumer);
00247         this->connected_ = 1;
00248 
00249         TAO_CEC_Unlock reverse_lock (*this->lock_);
00250 
00251         {
00252           ACE_GUARD_THROW_EX (
00253               TAO_CEC_Unlock, ace_mon, reverse_lock,
00254               CORBA::INTERNAL ());
00255           // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00256 
00257           this->event_channel_->reconnected (this);
00258         }
00259         return;
00260       }
00261 
00262     this->consumer_ = apply_policy (pull_consumer);
00263     this->connected_ = 1;
00264   }
00265 
00266   // Notify the event channel...
00267   this->event_channel_->connected (this);
00268 }
00269 
00270 CosEventComm::PullConsumer_ptr
00271 TAO_CEC_ProxyPullSupplier::apply_policy (CosEventComm::PullConsumer_ptr pre)
00272 {
00273   if (CORBA::is_nil (pre)) return pre;
00274   this->nopolicy_consumer_ = CosEventComm::PullConsumer::_duplicate (pre);
00275 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00276   CosEventComm::PullConsumer_var post =
00277     CosEventComm::PullConsumer::_duplicate (pre);
00278   if (this->timeout_ > ACE_Time_Value::zero)
00279     {
00280       CORBA::PolicyList policy_list;
00281       policy_list.length (1);
00282       policy_list[0] = this->event_channel_->
00283         create_roundtrip_timeout_policy (this->timeout_);
00284 
00285       CORBA::Object_var post_obj = pre->_set_policy_overrides
00286         (policy_list, CORBA::ADD_OVERRIDE);
00287       post = CosEventComm::PullConsumer::_narrow(post_obj.in ());
00288 
00289       policy_list[0]->destroy ();
00290       policy_list.length (0);
00291     }
00292   return post._retn ();
00293 #else
00294   return CosEventComm::PullConsumer::_duplicate (pre);
00295 #endif /* TAO_HAS_CORBA_MESSAGING */
00296 }
00297 
00298 void
00299 TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier ()
00300 {
00301   CosEventComm::PullConsumer_var consumer;
00302 
00303   {
00304     ACE_GUARD_THROW_EX (
00305         ACE_Lock, ace_mon, *this->lock_,
00306         CORBA::INTERNAL ());
00307     // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00308 
00309     if (this->is_connected_i () == 0)
00310       throw CORBA::BAD_INV_ORDER ();
00311 
00312     consumer = this->consumer_._retn ();
00313 
00314     this->cleanup_i ();
00315   }
00316 
00317   // Notify the event channel....
00318   this->event_channel_->disconnected (this);
00319 
00320   if (CORBA::is_nil (consumer.in ()))
00321     return;
00322 
00323   if (this->event_channel_->disconnect_callbacks ())
00324     {
00325       try
00326         {
00327           consumer->disconnect_pull_consumer ();
00328         }
00329       catch (const CORBA::Exception& ex)
00330         {
00331           // Ignore exceptions, we must isolate other clients from
00332           // problems on this one.
00333           ex._tao_print_exception ("ProxySupplier::disconnect_pull_supplier");
00334         }
00335     }
00336 }
00337 
00338 PortableServer::POA_ptr
00339 TAO_CEC_ProxyPullSupplier::_default_POA (void)
00340 {
00341   return PortableServer::POA::_duplicate (this->default_POA_.in ());
00342 }
00343 
00344 void
00345 TAO_CEC_ProxyPullSupplier::_add_ref (void)
00346 {
00347   this->_incr_refcnt ();
00348 }
00349 
00350 void
00351 TAO_CEC_ProxyPullSupplier::_remove_ref (void)
00352 {
00353   this->_decr_refcnt ();
00354 }
00355 
00356 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:44:51 2010 for TAO_CosEvent by  doxygen 1.4.7