00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00007 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00008
00009 #include "ace/Reverse_Lock_T.h"
00010
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.inl"
00013 #endif
00014
00015 ACE_RCSID (CosEvent,
00016 CEC_ProxyPullConsumer,
00017 "$Id: CEC_ProxyPullConsumer.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00022
00023 TAO_CEC_ProxyPullConsumer::
00024 TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec,
00025 const ACE_Time_Value &timeout)
00026 : event_channel_ (ec),
00027 timeout_ (timeout),
00028 refcount_ (1)
00029 {
00030 this->lock_ =
00031 this->event_channel_->create_consumer_lock ();
00032
00033 this->default_POA_ =
00034 this->event_channel_->consumer_poa ();
00035
00036 this->event_channel_->get_servant_retry_map ().bind (this, 0);
00037 }
00038
00039 TAO_CEC_ProxyPullConsumer::~TAO_CEC_ProxyPullConsumer (void)
00040 {
00041 this->event_channel_->get_servant_retry_map ().unbind (this);
00042 this->event_channel_->destroy_consumer_lock (this->lock_);
00043 }
00044
00045 void
00046 TAO_CEC_ProxyPullConsumer::activate (
00047 CosEventChannelAdmin::ProxyPullConsumer_ptr &activated_proxy)
00048 {
00049 CosEventChannelAdmin::ProxyPullConsumer_var result;
00050 try
00051 {
00052 result = this->_this ();
00053 }
00054 catch (const CORBA::Exception&)
00055 {
00056 result = CosEventChannelAdmin::ProxyPullConsumer::_nil ();
00057 }
00058 activated_proxy = result._retn ();
00059 }
00060
00061 void
00062 TAO_CEC_ProxyPullConsumer::deactivate (void)
00063 {
00064 try
00065 {
00066 PortableServer::POA_var poa =
00067 this->_default_POA ();
00068 PortableServer::ObjectId_var id =
00069 poa->servant_to_id (this);
00070 poa->deactivate_object (id.in ());
00071 }
00072 catch (const CORBA::Exception&)
00073 {
00074
00075
00076
00077
00078 }
00079 }
00080
00081
00082
00083
00084
00085
00086 CORBA::Any*
00087 TAO_CEC_ProxyPullConsumer::try_pull_from_supplier (
00088 CORBA::Boolean_out has_event)
00089 {
00090 has_event = 0;
00091 CosEventComm::PullSupplier_var supplier;
00092 {
00093 ACE_GUARD_THROW_EX (
00094 ACE_Lock, ace_mon, *this->lock_,
00095 CORBA::INTERNAL ());
00096
00097
00098 if (this->is_connected_i () == 0)
00099 return 0;
00100
00101 supplier =
00102 CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00103
00104
00105
00106 }
00107
00108 CORBA::Any_var any;
00109 TAO_CEC_SupplierControl *control =
00110 this->event_channel_->supplier_control ();
00111
00112 try
00113 {
00114 any = supplier->try_pull (has_event);
00115
00116
00117 control->successful_transmission(this);
00118 }
00119 catch (const CORBA::OBJECT_NOT_EXIST&)
00120 {
00121 control->supplier_not_exist (this);
00122 }
00123 catch (CORBA::SystemException& sysex)
00124 {
00125 control->system_exception (this,
00126 sysex);
00127 }
00128 catch (const CORBA::Exception&)
00129 {
00130
00131 }
00132 return any._retn ();
00133 }
00134
00135 CORBA::Any*
00136 TAO_CEC_ProxyPullConsumer::pull_from_supplier ()
00137 {
00138 CosEventComm::PullSupplier_var supplier;
00139 {
00140 ACE_GUARD_THROW_EX (
00141 ACE_Lock, ace_mon, *this->lock_,
00142 CORBA::INTERNAL ());
00143
00144
00145 if (this->is_connected_i () == 0)
00146 return 0;
00147
00148 supplier =
00149 CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00150
00151
00152
00153 }
00154
00155 CORBA::Any_var any;
00156 try
00157 {
00158 any = supplier->pull ();
00159 }
00160 catch (const CORBA::Exception&)
00161 {
00162
00163
00164 }
00165 return any._retn ();
00166 }
00167
00168 CORBA::Boolean
00169 TAO_CEC_ProxyPullConsumer::supplier_non_existent (
00170 CORBA::Boolean_out disconnected)
00171 {
00172 CORBA::Object_var supplier;
00173 {
00174 ACE_GUARD_THROW_EX (
00175 ACE_Lock, ace_mon, *this->lock_,
00176 CORBA::INTERNAL ());
00177
00178 disconnected = 0;
00179 if (this->is_connected_i () == 0)
00180 {
00181 disconnected = 1;
00182 return 0;
00183 }
00184 if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
00185 {
00186 return 0;
00187 }
00188 supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
00189 }
00190
00191 #if (TAO_HAS_MINIMUM_CORBA == 0)
00192 return supplier->_non_existent ();
00193 #else
00194 return 0;
00195 #endif
00196 }
00197
00198 void
00199 TAO_CEC_ProxyPullConsumer::shutdown (void)
00200 {
00201 CosEventComm::PullSupplier_var supplier;
00202
00203 {
00204 ACE_GUARD_THROW_EX (
00205 ACE_Lock, ace_mon, *this->lock_,
00206 CORBA::INTERNAL ());
00207
00208
00209 supplier = this->supplier_._retn ();
00210 }
00211
00212 this->deactivate ();
00213
00214 if (CORBA::is_nil (supplier.in ()))
00215 return;
00216
00217 try
00218 {
00219 supplier->disconnect_pull_supplier ();
00220 }
00221 catch (const CORBA::Exception&)
00222 {
00223
00224
00225 }
00226 }
00227
00228 void
00229 TAO_CEC_ProxyPullConsumer::cleanup_i (void)
00230 {
00231 this->supplier_ =
00232 CosEventComm::PullSupplier::_nil ();
00233 }
00234
00235 CORBA::ULong
00236 TAO_CEC_ProxyPullConsumer::_incr_refcnt (void)
00237 {
00238 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00239 return this->refcount_++;
00240 }
00241
00242 CORBA::ULong
00243 TAO_CEC_ProxyPullConsumer::_decr_refcnt (void)
00244 {
00245 {
00246 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00247 this->refcount_--;
00248 if (this->refcount_ != 0)
00249 return this->refcount_;
00250 }
00251
00252
00253 this->event_channel_->destroy_proxy (this);
00254 return 0;
00255 }
00256
00257 void
00258 TAO_CEC_ProxyPullConsumer::connect_pull_supplier (
00259 CosEventComm::PullSupplier_ptr pull_supplier)
00260 {
00261
00262 if (CORBA::is_nil (pull_supplier))
00263 throw CORBA::BAD_PARAM ();
00264
00265 {
00266 ACE_GUARD_THROW_EX (
00267 ACE_Lock, ace_mon, *this->lock_,
00268 CORBA::INTERNAL ());
00269
00270
00271 if (this->is_connected_i ())
00272 {
00273 if (this->event_channel_->supplier_reconnect () == 0)
00274 throw CosEventChannelAdmin::AlreadyConnected ();
00275
00276
00277
00278 this->cleanup_i ();
00279
00280
00281
00282 TAO_CEC_Unlock reverse_lock (*this->lock_);
00283
00284 {
00285 ACE_GUARD_THROW_EX (
00286 TAO_CEC_Unlock, ace_mon, reverse_lock,
00287 CORBA::INTERNAL ());
00288
00289
00290 this->event_channel_->disconnected (this);
00291 }
00292
00293
00294 if (this->is_connected_i ())
00295 return;
00296 }
00297 this->supplier_ = apply_policy (pull_supplier);
00298 }
00299
00300
00301 this->event_channel_->connected (this);
00302 }
00303
00304 CosEventComm::PullSupplier_ptr
00305 TAO_CEC_ProxyPullConsumer::apply_policy (CosEventComm::PullSupplier_ptr pre)
00306 {
00307 this->nopolicy_supplier_ = CosEventComm::PullSupplier::_duplicate (pre);
00308 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00309 CosEventComm::PullSupplier_var post =
00310 CosEventComm::PullSupplier::_duplicate (pre);
00311 if (this->timeout_ > ACE_Time_Value::zero)
00312 {
00313 CORBA::PolicyList policy_list;
00314 policy_list.length (1);
00315 policy_list[0] = this->event_channel_->
00316 create_roundtrip_timeout_policy (this->timeout_);
00317
00318 CORBA::Object_var post_obj = pre->_set_policy_overrides
00319 (policy_list, CORBA::ADD_OVERRIDE);
00320 post = CosEventComm::PullSupplier::_narrow (post_obj.in ());
00321
00322 policy_list[0]->destroy ();
00323 policy_list.length (0);
00324 }
00325 return post._retn ();
00326 #else
00327 return CosEventComm::PullSupplier::_duplicate (pre);
00328 #endif
00329 }
00330
00331 void
00332 TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer ()
00333 {
00334 CosEventComm::PullSupplier_var supplier;
00335
00336 {
00337 ACE_GUARD_THROW_EX (
00338 ACE_Lock, ace_mon, *this->lock_,
00339 CORBA::INTERNAL ());
00340
00341
00342 if (this->is_connected_i () == 0)
00343 throw CORBA::BAD_INV_ORDER ();
00344
00345 supplier = this->supplier_._retn ();
00346
00347 this->cleanup_i ();
00348 }
00349
00350
00351 this->event_channel_->disconnected (this);
00352
00353 if (this->event_channel_->disconnect_callbacks ())
00354 {
00355 try
00356 {
00357 supplier->disconnect_pull_supplier ();
00358 }
00359 catch (const CORBA::Exception&)
00360 {
00361
00362
00363 }
00364 }
00365 }
00366
00367 PortableServer::POA_ptr
00368 TAO_CEC_ProxyPullConsumer::_default_POA (void)
00369 {
00370 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00371 }
00372
00373 void
00374 TAO_CEC_ProxyPullConsumer::_add_ref (void)
00375 {
00376 this->_incr_refcnt ();
00377 }
00378
00379 void
00380 TAO_CEC_ProxyPullConsumer::_remove_ref (void)
00381 {
00382 this->_decr_refcnt ();
00383 }
00384
00385 TAO_END_VERSIONED_NAMESPACE_DECL