00001
00002
00003
00004
00005 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.h"
00006 #include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
00007 #include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
00008 #include "orbsvcs/CosEvent/CEC_DynamicImplementation.h"
00009 #include "tao/debug.h"
00010
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_TypedProxyPushConsumer.inl"
00013 #endif
00014
00015 #include "ace/Reverse_Lock_T.h"
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00020
00021
00022 TAO_CEC_TypedProxyPushConsumer::TAO_CEC_TypedProxyPushConsumer
00023 (TAO_CEC_TypedEventChannel* ec, const ACE_Time_Value &timeout)
00024 : typed_event_channel_ (ec),
00025 timeout_ (timeout),
00026 refcount_ (1),
00027 connected_ (0)
00028 {
00029 this->lock_ =
00030 this->typed_event_channel_->create_consumer_lock ();
00031
00032 this->default_POA_ =
00033 this->typed_event_channel_->typed_consumer_poa ();
00034
00035 this->typed_event_channel_->get_servant_retry_map ().bind (this, 0);
00036
00037
00038 if (TAO_debug_level >= 10)
00039 {
00040 ACE_DEBUG ((LM_DEBUG,
00041 ACE_TEXT ("***** Initializing the DSI for the new TypedProxyPushConsumer *****\n")));
00042 }
00043
00044 this->dsi_impl_ = new
00045 TAO_CEC_DynamicImplementationServer (this->default_POA_.in(),
00046 this,
00047 this->typed_event_channel_);
00048
00049 try{
00050
00051
00052 this->oid_ =
00053 this->default_POA_->activate_object (this->dsi_impl_);
00054 }
00055 catch (...){
00056 }
00057 }
00058
00059
00060 TAO_CEC_TypedProxyPushConsumer::~TAO_CEC_TypedProxyPushConsumer (void)
00061 {
00062 try{
00063 this->default_POA_->deactivate_object (this->oid_.in ());
00064 }
00065 catch (...){
00066 }
00067
00068 delete dsi_impl_;
00069
00070 this->typed_event_channel_->get_servant_retry_map ().unbind (this);
00071 this->typed_event_channel_->destroy_consumer_lock (this->lock_);
00072 }
00073
00074 void
00075 TAO_CEC_TypedProxyPushConsumer::activate (
00076 CosTypedEventChannelAdmin::TypedProxyPushConsumer_ptr &activated_proxy)
00077 {
00078 CosTypedEventChannelAdmin::TypedProxyPushConsumer_var result;
00079 try
00080 {
00081 result = this->_this ();
00082 }
00083 catch (const CORBA::Exception&)
00084 {
00085 result = CosTypedEventChannelAdmin::TypedProxyPushConsumer::_nil ();
00086 }
00087 activated_proxy = result._retn ();
00088 }
00089
00090 void
00091 TAO_CEC_TypedProxyPushConsumer::deactivate (void)
00092 {
00093 try
00094 {
00095 PortableServer::POA_var poa =
00096 this->_default_POA ();
00097 PortableServer::ObjectId_var id =
00098 poa->servant_to_id (this);
00099 poa->deactivate_object (id.in ());
00100 }
00101 catch (const CORBA::Exception&)
00102 {
00103
00104
00105
00106
00107 }
00108 }
00109
00110 CORBA::Boolean
00111 TAO_CEC_TypedProxyPushConsumer::supplier_non_existent (
00112 CORBA::Boolean_out disconnected)
00113 {
00114 CORBA::Object_var supplier;
00115 {
00116 ACE_GUARD_THROW_EX (
00117 ACE_Lock, ace_mon, *this->lock_,
00118 CORBA::INTERNAL ());
00119
00120 disconnected = 0;
00121 if (this->is_connected_i () == 0)
00122 {
00123 disconnected = 1;
00124 return 0;
00125 }
00126 if (CORBA::is_nil (this->nopolicy_typed_supplier_.in ()))
00127 {
00128 return 0;
00129 }
00130 supplier = CORBA::Object::_duplicate
00131 (this->nopolicy_typed_supplier_.in ());
00132 }
00133
00134 #if (TAO_HAS_MINIMUM_CORBA == 0)
00135 return supplier->_non_existent ();
00136 #else
00137 return 0;
00138 #endif
00139 }
00140
00141 void
00142 TAO_CEC_TypedProxyPushConsumer::shutdown (void)
00143 {
00144 CosEventComm::PushSupplier_var supplier;
00145
00146 {
00147 ACE_GUARD_THROW_EX (
00148 ACE_Lock, ace_mon, *this->lock_,
00149 CORBA::INTERNAL ());
00150
00151
00152 supplier = this->typed_supplier_._retn ();
00153 this->connected_ = 0;
00154 }
00155
00156 this->deactivate ();
00157
00158 if (CORBA::is_nil (supplier.in ()))
00159 return;
00160
00161 try
00162 {
00163 supplier->disconnect_push_supplier ();
00164 }
00165 catch (const CORBA::Exception&)
00166 {
00167
00168
00169 }
00170 }
00171
00172 void
00173 TAO_CEC_TypedProxyPushConsumer::cleanup_i (void)
00174 {
00175 this->typed_supplier_ =
00176 CosEventComm::PushSupplier::_nil ();
00177 this->connected_ = 0;
00178 }
00179
00180 CORBA::ULong
00181 TAO_CEC_TypedProxyPushConsumer::_incr_refcnt (void)
00182 {
00183 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00184 return this->refcount_++;
00185 }
00186
00187 CORBA::ULong
00188 TAO_CEC_TypedProxyPushConsumer::_decr_refcnt (void)
00189 {
00190 {
00191 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00192 this->refcount_--;
00193 if (this->refcount_ != 0)
00194 return this->refcount_;
00195 }
00196
00197
00198 this->typed_event_channel_->destroy_proxy (this);
00199 return 0;
00200 }
00201
00202 void
00203 TAO_CEC_TypedProxyPushConsumer::connect_push_supplier (
00204 CosEventComm::PushSupplier_ptr push_supplier)
00205 {
00206 {
00207 ACE_GUARD_THROW_EX (
00208 ACE_Lock, ace_mon, *this->lock_,
00209 CORBA::INTERNAL ());
00210
00211
00212 if (this->is_connected_i ())
00213 {
00214 if (this->typed_event_channel_->supplier_reconnect () == 0)
00215 throw CosEventChannelAdmin::AlreadyConnected ();
00216
00217
00218
00219 this->cleanup_i ();
00220
00221
00222
00223 TAO_CEC_Unlock reverse_lock (*this->lock_);
00224
00225 {
00226 ACE_GUARD_THROW_EX (
00227 TAO_CEC_Unlock, ace_mon, reverse_lock,
00228 CORBA::INTERNAL ());
00229
00230
00231 this->typed_event_channel_->disconnected (this);
00232 }
00233
00234
00235 if (this->is_connected_i ())
00236 return;
00237 }
00238 this->typed_supplier_ = apply_policy (push_supplier);
00239 this->connected_ = 1;
00240 }
00241
00242
00243 this->typed_event_channel_->connected (this);
00244 }
00245
00246 CosEventComm::PushSupplier_ptr
00247 TAO_CEC_TypedProxyPushConsumer::apply_policy
00248 (CosEventComm::PushSupplier_ptr pre)
00249 {
00250 if (CORBA::is_nil(pre)) return pre;
00251 this->nopolicy_typed_supplier_ =
00252 CosEventComm::PushSupplier::_duplicate (pre);
00253 CosEventComm::PushSupplier_var post =
00254 CosEventComm::PushSupplier::_duplicate (pre);
00255 if (this->timeout_ > ACE_Time_Value::zero)
00256 {
00257 CORBA::PolicyList policy_list;
00258 policy_list.length (1);
00259 policy_list[0] = this->typed_event_channel_->
00260 create_roundtrip_timeout_policy (this->timeout_);
00261
00262 CORBA::Object_var post_obj = pre->_set_policy_overrides
00263 (policy_list, CORBA::ADD_OVERRIDE);
00264 post = CosEventComm::PushSupplier::_narrow(post_obj.in ());
00265
00266 policy_list[0]->destroy ();
00267 policy_list.length (0);
00268 }
00269 return post._retn ();
00270 }
00271
00272 void
00273 TAO_CEC_TypedProxyPushConsumer::push (const CORBA::Any& )
00274 {
00275 throw CORBA::NO_IMPLEMENT ();
00276 }
00277
00278 void
00279 TAO_CEC_TypedProxyPushConsumer::disconnect_push_consumer ()
00280 {
00281 CosEventComm::PushSupplier_var supplier;
00282
00283 {
00284 ACE_GUARD_THROW_EX (
00285 ACE_Lock, ace_mon, *this->lock_,
00286 CORBA::INTERNAL ());
00287
00288
00289 if (this->is_connected_i () == 0)
00290 throw CORBA::BAD_INV_ORDER ();
00291
00292 supplier = this->typed_supplier_._retn ();
00293
00294 this->cleanup_i ();
00295 }
00296
00297
00298 this->typed_event_channel_->disconnected (this);
00299
00300 if (!CORBA::is_nil (supplier.in ()))
00301 {
00302 if (this->typed_event_channel_->disconnect_callbacks ())
00303 {
00304 try
00305 {
00306 supplier->disconnect_push_supplier ();
00307 }
00308 catch (const CORBA::Exception&)
00309 {
00310
00311
00312 }
00313 }
00314 }
00315 }
00316
00317 PortableServer::POA_ptr
00318 TAO_CEC_TypedProxyPushConsumer::_default_POA (void)
00319 {
00320 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00321 }
00322
00323 void
00324 TAO_CEC_TypedProxyPushConsumer::_add_ref (void)
00325 {
00326 this->_incr_refcnt ();
00327 }
00328
00329 void
00330 TAO_CEC_TypedProxyPushConsumer::_remove_ref (void)
00331 {
00332 this->_decr_refcnt ();
00333 }
00334
00335 CORBA::Object_ptr
00336 TAO_CEC_TypedProxyPushConsumer::get_typed_consumer (void)
00337
00338 {
00339 CORBA::Object_var server =
00340 default_POA_->id_to_reference (this->oid_.in ());
00341
00342 return CORBA::Object::_duplicate (server.in());
00343 }
00344
00345 void
00346 TAO_CEC_TypedProxyPushConsumer::invoke (const TAO_CEC_TypedEvent& typed_event)
00347 {
00348 TAO_CEC_TypedProxyPushConsumer_Guard ace_mon (this->lock_,
00349 this->refcount_,
00350 this->typed_event_channel_,
00351 this);
00352 if (!ace_mon.locked ())
00353 return;
00354
00355 this->typed_event_channel_->typed_consumer_admin ()->invoke (typed_event);
00356 }
00357
00358
00359
00360 TAO_CEC_TypedProxyPushConsumer_Guard::
00361 TAO_CEC_TypedProxyPushConsumer_Guard (ACE_Lock *lock,
00362 CORBA::ULong &refcount,
00363 TAO_CEC_TypedEventChannel *ec,
00364 TAO_CEC_TypedProxyPushConsumer *proxy)
00365 : lock_ (lock),
00366 refcount_ (refcount),
00367 typed_event_channel_ (ec),
00368 proxy_ (proxy),
00369 locked_ (0)
00370 {
00371 ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00372
00373
00374
00375
00376
00377
00378
00379 if (proxy->is_connected_i () == 0)
00380 return;
00381
00382 this->locked_ = 1;
00383 this->refcount_++;
00384 }
00385
00386 TAO_CEC_TypedProxyPushConsumer_Guard::
00387 ~TAO_CEC_TypedProxyPushConsumer_Guard (void)
00388 {
00389
00390
00391 if (!this->locked_)
00392 return;
00393
00394 {
00395 ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
00396
00397
00398
00399
00400
00401
00402
00403 this->refcount_--;
00404 if (this->refcount_ != 0)
00405 return;
00406 }
00407 this->typed_event_channel_->destroy_proxy (this->proxy_);
00408 }
00409
00410 TAO_END_VERSIONED_NAMESPACE_DECL