00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00004 #include "orbsvcs/CosEvent/CEC_Dispatching.h"
00005 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00006
00007 #include "ace/Reverse_Lock_T.h"
00008
00009 #if ! defined (__ACE_INLINE__)
00010 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.i"
00011 #endif
00012
00013 ACE_RCSID (CosEvent,
00014 CEC_ProxyPullSupplier,
00015 "CEC_ProxyPullSupplier.cpp,v 1.22 2006/03/14 06:14:24 jtc Exp")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00020
00021 TAO_CEC_ProxyPullSupplier::TAO_CEC_ProxyPullSupplier (TAO_CEC_EventChannel* ec)
00022 : event_channel_ (ec),
00023 refcount_ (1),
00024 connected_ (0),
00025 wait_not_empty_ (queue_lock_)
00026 {
00027 this->lock_ =
00028 this->event_channel_->create_supplier_lock ();
00029
00030 this->default_POA_ =
00031 this->event_channel_->supplier_poa ();
00032
00033 this->event_channel_->get_servant_retry_map ().bind (this, 0);
00034 }
00035
00036 TAO_CEC_ProxyPullSupplier::~TAO_CEC_ProxyPullSupplier (void)
00037 {
00038 this->event_channel_->get_servant_retry_map ().unbind (this);
00039 this->event_channel_->destroy_supplier_lock (this->lock_);
00040 }
00041
00042 void
00043 TAO_CEC_ProxyPullSupplier::activate (
00044 CosEventChannelAdmin::ProxyPullSupplier_ptr &activated_proxy
00045 ACE_ENV_ARG_DECL)
00046 ACE_THROW_SPEC ((CORBA::SystemException))
00047 {
00048 CosEventChannelAdmin::ProxyPullSupplier_var result;
00049 ACE_TRY
00050 {
00051 result = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
00052 ACE_TRY_CHECK;
00053 }
00054 ACE_CATCHANY
00055 {
00056 result = CosEventChannelAdmin::ProxyPullSupplier::_nil ();
00057 }
00058 ACE_ENDTRY;
00059 activated_proxy = result._retn ();
00060 }
00061
00062 void
00063 TAO_CEC_ProxyPullSupplier::deactivate (ACE_ENV_SINGLE_ARG_DECL)
00064 ACE_THROW_SPEC ((CORBA::SystemException))
00065 {
00066 ACE_TRY
00067 {
00068 PortableServer::POA_var poa =
00069 this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
00070 ACE_TRY_CHECK;
00071 PortableServer::ObjectId_var id =
00072 poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
00073 ACE_TRY_CHECK;
00074 poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
00075 ACE_TRY_CHECK;
00076 }
00077 ACE_CATCHANY
00078 {
00079
00080
00081
00082
00083 }
00084 ACE_ENDTRY;
00085 }
00086
00087 void
00088 TAO_CEC_ProxyPullSupplier::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00089 {
00090
00091
00092 CosEventComm::PullConsumer_var consumer;
00093
00094 {
00095 ACE_GUARD_THROW_EX (
00096 ACE_Lock, ace_mon, *this->lock_,
00097 CORBA::INTERNAL ());
00098
00099 ACE_CHECK;
00100
00101 consumer = this->consumer_._retn ();
00102 }
00103
00104 this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
00105 ACE_CHECK;
00106
00107 if (CORBA::is_nil (consumer.in ()))
00108 return;
00109
00110 ACE_TRY
00111 {
00112 consumer->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00113 ACE_TRY_CHECK;
00114 }
00115 ACE_CATCHANY
00116 {
00117
00118
00119 }
00120 ACE_ENDTRY;
00121 }
00122
00123 CORBA::Boolean
00124 TAO_CEC_ProxyPullSupplier::consumer_non_existent (
00125 CORBA::Boolean_out disconnected
00126 ACE_ENV_ARG_DECL)
00127 {
00128 CORBA::Object_var consumer;
00129 {
00130 ACE_GUARD_THROW_EX (
00131 ACE_Lock, ace_mon, *this->lock_,
00132 CORBA::INTERNAL ());
00133 ACE_CHECK_RETURN (0);
00134
00135 disconnected = 0;
00136 if (this->is_connected_i () == 0)
00137 {
00138 disconnected = 1;
00139 return 0;
00140 }
00141 if (CORBA::is_nil (this->consumer_.in ()))
00142 {
00143 return 0;
00144 }
00145 consumer = CORBA::Object::_duplicate (this->consumer_.in ());
00146 }
00147
00148 #if (TAO_HAS_MINIMUM_CORBA == 0)
00149 return consumer->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00150 #else
00151 return 0;
00152 #endif
00153 }
00154
00155 void
00156 TAO_CEC_ProxyPullSupplier::push (const CORBA::Any &event
00157 ACE_ENV_ARG_DECL_NOT_USED)
00158 {
00159 if (this->is_connected () == 0)
00160 return;
00161
00162 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00163
00164
00165 (void) this->queue_.enqueue_tail (event);
00166
00167 this->wait_not_empty_.signal ();
00168 }
00169
00170 CORBA::Any *
00171 TAO_CEC_ProxyPullSupplier::pull (ACE_ENV_SINGLE_ARG_DECL)
00172 ACE_THROW_SPEC ((CORBA::SystemException,
00173 CosEventComm::Disconnected))
00174 {
00175 if (this->is_connected () == 0)
00176 ACE_THROW_RETURN (CosEventComm::Disconnected (), 0);
00177
00178 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00179 while (this->queue_.is_empty ())
00180 {
00181 this->wait_not_empty_.wait ();
00182 }
00183 CORBA::Any any;
00184 if (this->queue_.dequeue_head (any) != 0)
00185 {
00186 ACE_THROW_RETURN (CORBA::INTERNAL (), 0);
00187 }
00188 return new CORBA::Any (any);
00189 }
00190
00191 CORBA::Any *
00192 TAO_CEC_ProxyPullSupplier::try_pull (CORBA::Boolean_out has_event
00193 ACE_ENV_ARG_DECL)
00194 ACE_THROW_SPEC ((CORBA::SystemException,CosEventComm::Disconnected))
00195 {
00196 has_event = 0;
00197 if (this->is_connected () == 0)
00198 ACE_THROW_RETURN (CosEventComm::Disconnected (), 0);
00199
00200 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00201 CORBA::Any any;
00202 if (this->queue_.is_empty ())
00203 {
00204 any <<= CORBA::Long(0);
00205 return new CORBA::Any (any);
00206 }
00207
00208 if (this->queue_.dequeue_head (any) != 0)
00209 {
00210 ACE_THROW_RETURN (CORBA::INTERNAL (), 0);
00211 }
00212 has_event = 1;
00213 return new CORBA::Any (any);
00214 }
00215
00216 void
00217 TAO_CEC_ProxyPullSupplier::cleanup_i (void)
00218 {
00219 this->consumer_ =
00220 CosEventComm::PullConsumer::_nil ();
00221 this->connected_ = 0;
00222 }
00223
00224 CORBA::ULong
00225 TAO_CEC_ProxyPullSupplier::_incr_refcnt (void)
00226 {
00227 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00228 return this->refcount_++;
00229 }
00230
00231 CORBA::ULong
00232 TAO_CEC_ProxyPullSupplier::_decr_refcnt (void)
00233 {
00234 {
00235 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00236 this->refcount_--;
00237 if (this->refcount_ != 0)
00238 return this->refcount_;
00239 }
00240
00241
00242 this->event_channel_->destroy_proxy (this);
00243 return 0;
00244 }
00245
00246 void
00247 TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
00248 CosEventComm::PullConsumer_ptr pull_consumer
00249 ACE_ENV_ARG_DECL)
00250 ACE_THROW_SPEC ((CORBA::SystemException,
00251 CosEventChannelAdmin::AlreadyConnected))
00252 {
00253 {
00254 ACE_GUARD_THROW_EX (
00255 ACE_Lock, ace_mon, *this->lock_,
00256 CORBA::INTERNAL ());
00257
00258 ACE_CHECK;
00259
00260 if (this->is_connected_i ())
00261 {
00262 if (this->event_channel_->consumer_reconnect () == 0)
00263 ACE_THROW (CosEventChannelAdmin::AlreadyConnected ());
00264
00265
00266 this->cleanup_i ();
00267
00268 this->consumer_ =
00269 CosEventComm::PullConsumer::_duplicate (pull_consumer);
00270 this->connected_ = 1;
00271
00272 TAO_CEC_Unlock reverse_lock (*this->lock_);
00273
00274 {
00275 ACE_GUARD_THROW_EX (
00276 TAO_CEC_Unlock, ace_mon, reverse_lock,
00277 CORBA::INTERNAL ());
00278
00279 ACE_CHECK;
00280
00281 this->event_channel_->reconnected (this ACE_ENV_ARG_PARAMETER);
00282 ACE_CHECK;
00283 }
00284 return;
00285 }
00286
00287 this->consumer_ =
00288 CosEventComm::PullConsumer::_duplicate (pull_consumer);
00289 this->connected_ = 1;
00290 }
00291
00292
00293 this->event_channel_->connected (this ACE_ENV_ARG_PARAMETER);
00294 }
00295
00296 void
00297 TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier (
00298 ACE_ENV_SINGLE_ARG_DECL)
00299 ACE_THROW_SPEC ((CORBA::SystemException))
00300 {
00301 CosEventComm::PullConsumer_var consumer;
00302
00303 {
00304 ACE_GUARD_THROW_EX (
00305 ACE_Lock, ace_mon, *this->lock_,
00306 CORBA::INTERNAL ());
00307
00308 ACE_CHECK;
00309
00310 if (this->is_connected_i () == 0)
00311 ACE_THROW (CORBA::BAD_INV_ORDER ());
00312
00313 consumer = this->consumer_._retn ();
00314
00315 this->cleanup_i ();
00316 }
00317
00318
00319 this->event_channel_->disconnected (this ACE_ENV_ARG_PARAMETER);
00320 ACE_CHECK;
00321
00322 if (CORBA::is_nil (consumer.in ()))
00323 return;
00324
00325 if (this->event_channel_->disconnect_callbacks ())
00326 {
00327 ACE_TRY
00328 {
00329 consumer->disconnect_pull_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00330 ACE_TRY_CHECK;
00331 }
00332 ACE_CATCHANY
00333 {
00334
00335
00336 ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
00337 "ProxySupplier::disconnect_pull_supplier");
00338 }
00339 ACE_ENDTRY;
00340 }
00341 }
00342
00343 PortableServer::POA_ptr
00344 TAO_CEC_ProxyPullSupplier::_default_POA (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00345 {
00346 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00347 }
00348
00349 void
00350 TAO_CEC_ProxyPullSupplier::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00351 {
00352 this->_incr_refcnt ();
00353 }
00354
00355 void
00356 TAO_CEC_ProxyPullSupplier::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00357 {
00358 this->_decr_refcnt ();
00359 }
00360
00361 TAO_END_VERSIONED_NAMESPACE_DECL