00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00004 #include "orbsvcs/CosEvent/CEC_Dispatching.h"
00005 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00006
00007 #include "ace/Reverse_Lock_T.h"
00008
00009 #if ! defined (__ACE_INLINE__)
00010 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.inl"
00011 #endif
00012
00013 ACE_RCSID (CosEvent,
00014 CEC_ProxyPullSupplier,
00015 "$Id: CEC_ProxyPullSupplier.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00020
00021 TAO_CEC_ProxyPullSupplier::TAO_CEC_ProxyPullSupplier
00022 (TAO_CEC_EventChannel* ec, const ACE_Time_Value &timeout)
00023 : event_channel_ (ec),
00024 timeout_ (timeout),
00025 refcount_ (1),
00026 connected_ (0),
00027 wait_not_empty_ (queue_lock_)
00028 {
00029 this->lock_ =
00030 this->event_channel_->create_supplier_lock ();
00031
00032 this->default_POA_ =
00033 this->event_channel_->supplier_poa ();
00034
00035 this->event_channel_->get_servant_retry_map ().bind (this, 0);
00036 }
00037
00038 TAO_CEC_ProxyPullSupplier::~TAO_CEC_ProxyPullSupplier (void)
00039 {
00040 this->event_channel_->get_servant_retry_map ().unbind (this);
00041 this->event_channel_->destroy_supplier_lock (this->lock_);
00042 }
00043
00044 void
00045 TAO_CEC_ProxyPullSupplier::activate (
00046 CosEventChannelAdmin::ProxyPullSupplier_ptr &activated_proxy)
00047 {
00048 CosEventChannelAdmin::ProxyPullSupplier_var result;
00049 try
00050 {
00051 result = this->_this ();
00052 }
00053 catch (const CORBA::Exception&)
00054 {
00055 result = CosEventChannelAdmin::ProxyPullSupplier::_nil ();
00056 }
00057 activated_proxy = result._retn ();
00058 }
00059
00060 void
00061 TAO_CEC_ProxyPullSupplier::deactivate (void)
00062 {
00063 try
00064 {
00065 PortableServer::POA_var poa =
00066 this->_default_POA ();
00067 PortableServer::ObjectId_var id =
00068 poa->servant_to_id (this);
00069 poa->deactivate_object (id.in ());
00070 }
00071 catch (const CORBA::Exception&)
00072 {
00073
00074
00075
00076
00077 }
00078 }
00079
00080 void
00081 TAO_CEC_ProxyPullSupplier::shutdown (void)
00082 {
00083
00084
00085 CosEventComm::PullConsumer_var consumer;
00086
00087 {
00088 ACE_GUARD_THROW_EX (
00089 ACE_Lock, ace_mon, *this->lock_,
00090 CORBA::INTERNAL ());
00091
00092
00093 consumer = this->consumer_._retn ();
00094 }
00095
00096 this->deactivate ();
00097
00098 if (CORBA::is_nil (consumer.in ()))
00099 return;
00100
00101 try
00102 {
00103 consumer->disconnect_pull_consumer ();
00104 }
00105 catch (const CORBA::Exception&)
00106 {
00107
00108
00109 }
00110 }
00111
00112 CORBA::Boolean
00113 TAO_CEC_ProxyPullSupplier::consumer_non_existent (
00114 CORBA::Boolean_out disconnected)
00115 {
00116 CORBA::Object_var consumer;
00117 {
00118 ACE_GUARD_THROW_EX (
00119 ACE_Lock, ace_mon, *this->lock_,
00120 CORBA::INTERNAL ());
00121
00122 disconnected = 0;
00123 if (this->is_connected_i () == 0)
00124 {
00125 disconnected = 1;
00126 return 0;
00127 }
00128 if (CORBA::is_nil (this->nopolicy_consumer_.in ()))
00129 {
00130 return 0;
00131 }
00132 consumer = CORBA::Object::_duplicate (this->nopolicy_consumer_.in ());
00133 }
00134
00135 #if (TAO_HAS_MINIMUM_CORBA == 0)
00136 return consumer->_non_existent ();
00137 #else
00138 return 0;
00139 #endif
00140 }
00141
00142 void
00143 TAO_CEC_ProxyPullSupplier::push (const CORBA::Any &event)
00144 {
00145 if (this->is_connected () == 0)
00146 return;
00147
00148 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00149
00150
00151 (void) this->queue_.enqueue_tail (event);
00152
00153 this->wait_not_empty_.signal ();
00154 }
00155
00156 CORBA::Any *
00157 TAO_CEC_ProxyPullSupplier::pull (void)
00158 {
00159 if (this->is_connected () == 0)
00160 throw CosEventComm::Disconnected ();
00161
00162 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00163 while (this->queue_.is_empty ())
00164 {
00165 this->wait_not_empty_.wait ();
00166 }
00167 CORBA::Any any;
00168 if (this->queue_.dequeue_head (any) != 0)
00169 {
00170 throw CORBA::INTERNAL ();
00171 }
00172 return new CORBA::Any (any);
00173 }
00174
00175 CORBA::Any *
00176 TAO_CEC_ProxyPullSupplier::try_pull (CORBA::Boolean_out has_event)
00177 {
00178 has_event = 0;
00179 if (this->is_connected () == 0)
00180 throw CosEventComm::Disconnected ();
00181
00182 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, queue_lock_, 0);
00183 CORBA::Any any;
00184 if (this->queue_.is_empty ())
00185 {
00186 any <<= CORBA::Long(0);
00187 return new CORBA::Any (any);
00188 }
00189
00190 if (this->queue_.dequeue_head (any) != 0)
00191 {
00192 throw CORBA::INTERNAL ();
00193 }
00194 has_event = 1;
00195 return new CORBA::Any (any);
00196 }
00197
00198 void
00199 TAO_CEC_ProxyPullSupplier::cleanup_i (void)
00200 {
00201 this->consumer_ =
00202 CosEventComm::PullConsumer::_nil ();
00203 this->connected_ = 0;
00204 }
00205
00206 CORBA::ULong
00207 TAO_CEC_ProxyPullSupplier::_incr_refcnt (void)
00208 {
00209 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00210 return this->refcount_++;
00211 }
00212
00213 CORBA::ULong
00214 TAO_CEC_ProxyPullSupplier::_decr_refcnt (void)
00215 {
00216 {
00217 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00218 this->refcount_--;
00219 if (this->refcount_ != 0)
00220 return this->refcount_;
00221 }
00222
00223
00224 this->event_channel_->destroy_proxy (this);
00225 return 0;
00226 }
00227
00228 void
00229 TAO_CEC_ProxyPullSupplier::connect_pull_consumer (
00230 CosEventComm::PullConsumer_ptr pull_consumer)
00231 {
00232 {
00233 ACE_GUARD_THROW_EX (
00234 ACE_Lock, ace_mon, *this->lock_,
00235 CORBA::INTERNAL ());
00236
00237
00238 if (this->is_connected_i ())
00239 {
00240 if (this->event_channel_->consumer_reconnect () == 0)
00241 throw CosEventChannelAdmin::AlreadyConnected ();
00242
00243
00244 this->cleanup_i ();
00245
00246 this->consumer_ = apply_policy (pull_consumer);
00247 this->connected_ = 1;
00248
00249 TAO_CEC_Unlock reverse_lock (*this->lock_);
00250
00251 {
00252 ACE_GUARD_THROW_EX (
00253 TAO_CEC_Unlock, ace_mon, reverse_lock,
00254 CORBA::INTERNAL ());
00255
00256
00257 this->event_channel_->reconnected (this);
00258 }
00259 return;
00260 }
00261
00262 this->consumer_ = apply_policy (pull_consumer);
00263 this->connected_ = 1;
00264 }
00265
00266
00267 this->event_channel_->connected (this);
00268 }
00269
00270 CosEventComm::PullConsumer_ptr
00271 TAO_CEC_ProxyPullSupplier::apply_policy (CosEventComm::PullConsumer_ptr pre)
00272 {
00273 if (CORBA::is_nil (pre)) return pre;
00274 this->nopolicy_consumer_ = CosEventComm::PullConsumer::_duplicate (pre);
00275 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00276 CosEventComm::PullConsumer_var post =
00277 CosEventComm::PullConsumer::_duplicate (pre);
00278 if (this->timeout_ > ACE_Time_Value::zero)
00279 {
00280 CORBA::PolicyList policy_list;
00281 policy_list.length (1);
00282 policy_list[0] = this->event_channel_->
00283 create_roundtrip_timeout_policy (this->timeout_);
00284
00285 CORBA::Object_var post_obj = pre->_set_policy_overrides
00286 (policy_list, CORBA::ADD_OVERRIDE);
00287 post = CosEventComm::PullConsumer::_narrow(post_obj.in ());
00288
00289 policy_list[0]->destroy ();
00290 policy_list.length (0);
00291 }
00292 return post._retn ();
00293 #else
00294 return CosEventComm::PullConsumer::_duplicate (pre);
00295 #endif
00296 }
00297
00298 void
00299 TAO_CEC_ProxyPullSupplier::disconnect_pull_supplier ()
00300 {
00301 CosEventComm::PullConsumer_var consumer;
00302
00303 {
00304 ACE_GUARD_THROW_EX (
00305 ACE_Lock, ace_mon, *this->lock_,
00306 CORBA::INTERNAL ());
00307
00308
00309 if (this->is_connected_i () == 0)
00310 throw CORBA::BAD_INV_ORDER ();
00311
00312 consumer = this->consumer_._retn ();
00313
00314 this->cleanup_i ();
00315 }
00316
00317
00318 this->event_channel_->disconnected (this);
00319
00320 if (CORBA::is_nil (consumer.in ()))
00321 return;
00322
00323 if (this->event_channel_->disconnect_callbacks ())
00324 {
00325 try
00326 {
00327 consumer->disconnect_pull_consumer ();
00328 }
00329 catch (const CORBA::Exception& ex)
00330 {
00331
00332
00333 ex._tao_print_exception ("ProxySupplier::disconnect_pull_supplier");
00334 }
00335 }
00336 }
00337
00338 PortableServer::POA_ptr
00339 TAO_CEC_ProxyPullSupplier::_default_POA (void)
00340 {
00341 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00342 }
00343
00344 void
00345 TAO_CEC_ProxyPullSupplier::_add_ref (void)
00346 {
00347 this->_incr_refcnt ();
00348 }
00349
00350 void
00351 TAO_CEC_ProxyPullSupplier::_remove_ref (void)
00352 {
00353 this->_decr_refcnt ();
00354 }
00355
00356 TAO_END_VERSIONED_NAMESPACE_DECL