00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_ProxyPushSupplier.h"
00007
00008 #include "ace/Reverse_Lock_T.h"
00009
00010 #if ! defined (__ACE_INLINE__)
00011 #include "orbsvcs/CosEvent/CEC_ProxyPushConsumer.inl"
00012 #endif
00013
00014 ACE_RCSID (CosEvent,
00015 CEC_ProxyPushConsumer,
00016 "$Id: CEC_ProxyPushConsumer.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00017
00018 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00021
00022 TAO_CEC_ProxyPushConsumer::
00023 TAO_CEC_ProxyPushConsumer (TAO_CEC_EventChannel* ec,
00024 const ACE_Time_Value &timeout)
00025 : event_channel_ (ec),
00026 timeout_(timeout),
00027 refcount_ (1),
00028 connected_ (false)
00029 {
00030 this->lock_ =
00031 this->event_channel_->create_consumer_lock ();
00032
00033 this->default_POA_ =
00034 this->event_channel_->consumer_poa ();
00035
00036 this->event_channel_->get_servant_retry_map ().bind (this, 0);
00037 }
00038
00039 TAO_CEC_ProxyPushConsumer::~TAO_CEC_ProxyPushConsumer (void)
00040 {
00041 this->event_channel_->get_servant_retry_map ().unbind (this);
00042 this->event_channel_->destroy_consumer_lock (this->lock_);
00043 }
00044
00045 void
00046 TAO_CEC_ProxyPushConsumer::activate (
00047 CosEventChannelAdmin::ProxyPushConsumer_ptr &activated_proxy)
00048 {
00049 CosEventChannelAdmin::ProxyPushConsumer_var result;
00050 try
00051 {
00052 result = this->_this ();
00053 }
00054 catch (const CORBA::Exception&)
00055 {
00056 result = CosEventChannelAdmin::ProxyPushConsumer::_nil ();
00057 }
00058 activated_proxy = result._retn ();
00059 }
00060
00061 void
00062 TAO_CEC_ProxyPushConsumer::deactivate (void)
00063 {
00064 try
00065 {
00066 PortableServer::POA_var poa =
00067 this->_default_POA ();
00068 PortableServer::ObjectId_var id =
00069 poa->servant_to_id (this);
00070 poa->deactivate_object (id.in ());
00071 }
00072 catch (const CORBA::Exception&)
00073 {
00074
00075
00076
00077
00078 }
00079 }
00080
00081 CORBA::Boolean
00082 TAO_CEC_ProxyPushConsumer::supplier_non_existent (
00083 CORBA::Boolean_out disconnected)
00084 {
00085 CORBA::Object_var supplier;
00086 {
00087 ACE_GUARD_THROW_EX (
00088 ACE_Lock, ace_mon, *this->lock_,
00089 CORBA::INTERNAL ());
00090
00091 disconnected = false;
00092 if (!this->is_connected_i ())
00093 {
00094 disconnected = true;
00095 return false;
00096 }
00097 if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
00098 {
00099 return false;
00100 }
00101 supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
00102 }
00103
00104 #if (TAO_HAS_MINIMUM_CORBA == 0)
00105 return supplier->_non_existent ();
00106 #else
00107 return false;
00108 #endif
00109 }
00110
00111 void
00112 TAO_CEC_ProxyPushConsumer::shutdown (void)
00113 {
00114 CosEventComm::PushSupplier_var supplier;
00115
00116 {
00117 ACE_GUARD_THROW_EX (
00118 ACE_Lock, ace_mon, *this->lock_,
00119 CORBA::INTERNAL ());
00120
00121
00122 supplier = this->supplier_._retn ();
00123 this->connected_ = false;
00124 }
00125
00126 this->deactivate ();
00127
00128 if (CORBA::is_nil (supplier.in ()))
00129 return;
00130
00131 try
00132 {
00133 supplier->disconnect_push_supplier ();
00134 }
00135 catch (const CORBA::Exception&)
00136 {
00137
00138
00139 }
00140 }
00141
00142 void
00143 TAO_CEC_ProxyPushConsumer::cleanup_i (void)
00144 {
00145 this->supplier_ =
00146 CosEventComm::PushSupplier::_nil ();
00147 this->connected_ = false;
00148 }
00149
00150 CORBA::ULong
00151 TAO_CEC_ProxyPushConsumer::_incr_refcnt (void)
00152 {
00153 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00154 return this->refcount_++;
00155 }
00156
00157 CORBA::ULong
00158 TAO_CEC_ProxyPushConsumer::_decr_refcnt (void)
00159 {
00160 {
00161 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00162 --this->refcount_;
00163 if (this->refcount_ != 0)
00164 return this->refcount_;
00165 }
00166
00167
00168 this->event_channel_->destroy_proxy (this);
00169 return 0;
00170 }
00171
00172 void
00173 TAO_CEC_ProxyPushConsumer::connect_push_supplier (
00174 CosEventComm::PushSupplier_ptr push_supplier)
00175 {
00176 {
00177 ACE_GUARD_THROW_EX (
00178 ACE_Lock, ace_mon, *this->lock_,
00179 CORBA::INTERNAL ());
00180
00181
00182 if (this->is_connected_i ())
00183 {
00184 if (this->event_channel_->supplier_reconnect () == 0)
00185 throw CosEventChannelAdmin::AlreadyConnected ();
00186
00187
00188
00189 this->cleanup_i ();
00190
00191
00192
00193 TAO_CEC_Unlock reverse_lock (*this->lock_);
00194
00195 {
00196 ACE_GUARD_THROW_EX (
00197 TAO_CEC_Unlock, ace_mon, reverse_lock,
00198 CORBA::INTERNAL ());
00199
00200
00201 this->event_channel_->disconnected (this);
00202 }
00203
00204
00205 if (this->is_connected_i ())
00206 return;
00207 }
00208 this->supplier_ = apply_policy (push_supplier);
00209 this->connected_ = true;
00210 }
00211
00212
00213 this->event_channel_->connected (this);
00214 }
00215
00216 CosEventComm::PushSupplier_ptr
00217 TAO_CEC_ProxyPushConsumer::apply_policy (CosEventComm::PushSupplier_ptr pre)
00218 {
00219 if (CORBA::is_nil (pre)) return pre;
00220 this->nopolicy_supplier_ = CosEventComm::PushSupplier::_duplicate (pre);
00221 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00222 CosEventComm::PushSupplier_var post =
00223 CosEventComm::PushSupplier::_duplicate (pre);
00224 if (this->timeout_ > ACE_Time_Value::zero)
00225 {
00226 CORBA::PolicyList policy_list;
00227 policy_list.length (1);
00228 policy_list[0] = this->event_channel_->
00229 create_roundtrip_timeout_policy (this->timeout_);
00230
00231 CORBA::Object_var post_obj = pre->_set_policy_overrides
00232 (policy_list, CORBA::ADD_OVERRIDE);
00233 post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
00234
00235 policy_list[0]->destroy ();
00236 policy_list.length (0);
00237 }
00238 return post._retn ();
00239 #else
00240 return CosEventComm::PushSupplier::_duplicate (pre);
00241 #endif
00242 }
00243
00244 void
00245 TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event)
00246 {
00247 TAO_CEC_ProxyPushConsumer_Guard ace_mon (this->lock_,
00248 this->refcount_,
00249 this->event_channel_,
00250 this);
00251 if (!ace_mon.locked ())
00252 return;
00253
00254 this->event_channel_->consumer_admin ()->push (event);
00255 }
00256
00257 void
00258 TAO_CEC_ProxyPushConsumer::disconnect_push_consumer ()
00259 {
00260 CosEventComm::PushSupplier_var supplier;
00261
00262 {
00263 ACE_GUARD_THROW_EX (
00264 ACE_Lock, ace_mon, *this->lock_,
00265 CORBA::INTERNAL ());
00266
00267
00268 if (!this->is_connected_i ())
00269 throw CORBA::BAD_INV_ORDER ();
00270
00271 supplier = this->supplier_._retn ();
00272
00273 this->cleanup_i ();
00274 }
00275
00276
00277 this->event_channel_->disconnected (this);
00278
00279 if (CORBA::is_nil (supplier.in ()))
00280 return;
00281
00282 if (this->event_channel_->disconnect_callbacks ())
00283 {
00284 try
00285 {
00286 supplier->disconnect_push_supplier ();
00287 }
00288 catch (const CORBA::Exception&)
00289 {
00290
00291
00292 }
00293 }
00294 }
00295
00296 PortableServer::POA_ptr
00297 TAO_CEC_ProxyPushConsumer::_default_POA (void)
00298 {
00299 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00300 }
00301
00302 void
00303 TAO_CEC_ProxyPushConsumer::_add_ref (void)
00304 {
00305 this->_incr_refcnt ();
00306 }
00307
00308 void
00309 TAO_CEC_ProxyPushConsumer::_remove_ref (void)
00310 {
00311 this->_decr_refcnt ();
00312 }
00313
00314
00315
00316 TAO_CEC_ProxyPushConsumer_Guard::
00317 TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock,
00318 CORBA::ULong &refcount,
00319 TAO_CEC_EventChannel *ec,
00320 TAO_CEC_ProxyPushConsumer *proxy)
00321 : lock_ (lock),
00322 refcount_ (refcount),
00323 event_channel_ (ec),
00324 proxy_ (proxy),
00325 locked_ (false)
00326 {
00327 ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00328
00329
00330
00331
00332
00333
00334
00335 if (!proxy->is_connected_i ())
00336 return;
00337
00338 this->locked_ = true;
00339 ++this->refcount_;
00340 }
00341
00342 TAO_CEC_ProxyPushConsumer_Guard::
00343 ~TAO_CEC_ProxyPushConsumer_Guard (void)
00344 {
00345
00346
00347 if (!this->locked_)
00348 return;
00349
00350 {
00351 ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00352
00353
00354
00355
00356
00357
00358
00359 --this->refcount_;
00360 if (this->refcount_ != 0)
00361 return;
00362 }
00363 this->event_channel_->destroy_proxy (this->proxy_);
00364 }
00365
00366 TAO_END_VERSIONED_NAMESPACE_DECL