00001
00002
00003 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.h"
00004 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
00005 #include "orbsvcs/CosEvent/CEC_ConsumerAdmin.h"
00006 #include "orbsvcs/CosEvent/CEC_SupplierControl.h"
00007 #include "orbsvcs/CosEvent/CEC_ProxyPullSupplier.h"
00008
00009 #include "ace/Reverse_Lock_T.h"
00010
00011 #if ! defined (__ACE_INLINE__)
00012 #include "orbsvcs/CosEvent/CEC_ProxyPullConsumer.inl"
00013 #endif
00014
00015 ACE_RCSID (CosEvent,
00016 CEC_ProxyPullConsumer,
00017 "$Id: CEC_ProxyPullConsumer.cpp 80714 2008-02-24 19:45:27Z johnnyw $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 typedef ACE_Reverse_Lock<ACE_Lock> TAO_CEC_Unlock;
00022
00023 TAO_CEC_ProxyPullConsumer::
00024 TAO_CEC_ProxyPullConsumer (TAO_CEC_EventChannel* ec,
00025 const ACE_Time_Value &timeout)
00026 : event_channel_ (ec),
00027 timeout_ (timeout),
00028 refcount_ (1)
00029 {
00030 this->lock_ =
00031 this->event_channel_->create_consumer_lock ();
00032
00033 this->default_POA_ =
00034 this->event_channel_->consumer_poa ();
00035
00036 this->event_channel_->get_servant_retry_map ().bind (this, 0);
00037 }
00038
00039 TAO_CEC_ProxyPullConsumer::~TAO_CEC_ProxyPullConsumer (void)
00040 {
00041 this->event_channel_->get_servant_retry_map ().unbind (this);
00042 this->event_channel_->destroy_consumer_lock (this->lock_);
00043 }
00044
00045 void
00046 TAO_CEC_ProxyPullConsumer::activate (
00047 CosEventChannelAdmin::ProxyPullConsumer_ptr &activated_proxy)
00048 {
00049 CosEventChannelAdmin::ProxyPullConsumer_var result;
00050 try
00051 {
00052 result = this->_this ();
00053 }
00054 catch (const CORBA::Exception&)
00055 {
00056 result = CosEventChannelAdmin::ProxyPullConsumer::_nil ();
00057 }
00058 activated_proxy = result._retn ();
00059 }
00060
00061 void
00062 TAO_CEC_ProxyPullConsumer::deactivate (void)
00063 {
00064 try
00065 {
00066 PortableServer::POA_var poa =
00067 this->_default_POA ();
00068 PortableServer::ObjectId_var id =
00069 poa->servant_to_id (this);
00070 poa->deactivate_object (id.in ());
00071 }
00072 catch (const CORBA::Exception&)
00073 {
00074
00075
00076
00077
00078 }
00079 }
00080
00081
00082
00083
00084
00085
00086 CORBA::Any*
00087 TAO_CEC_ProxyPullConsumer::try_pull_from_supplier (
00088 CORBA::Boolean_out has_event)
00089 {
00090 has_event = 0;
00091 CosEventComm::PullSupplier_var supplier;
00092 {
00093 ACE_GUARD_THROW_EX (
00094 ACE_Lock, ace_mon, *this->lock_,
00095 CORBA::INTERNAL ());
00096
00097
00098 if (this->is_connected_i () == 0)
00099 return 0;
00100
00101 supplier =
00102 CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00103
00104
00105
00106 }
00107
00108 CORBA::Any_var any;
00109 TAO_CEC_SupplierControl *control =
00110 this->event_channel_->supplier_control ();
00111
00112 try
00113 {
00114 any = supplier->try_pull (has_event);
00115
00116
00117 control->successful_transmission(this);
00118 }
00119 catch (const CORBA::OBJECT_NOT_EXIST&)
00120 {
00121 control->supplier_not_exist (this);
00122 }
00123 catch (CORBA::SystemException& sysex)
00124 {
00125 control->system_exception (this, sysex);
00126 }
00127 catch (const CORBA::Exception&)
00128 {
00129
00130 }
00131 return any._retn ();
00132 }
00133
00134 CORBA::Any*
00135 TAO_CEC_ProxyPullConsumer::pull_from_supplier ()
00136 {
00137 CosEventComm::PullSupplier_var supplier;
00138 {
00139 ACE_GUARD_THROW_EX (
00140 ACE_Lock, ace_mon, *this->lock_,
00141 CORBA::INTERNAL ());
00142
00143
00144 if (this->is_connected_i () == 0)
00145 return 0;
00146
00147 supplier =
00148 CosEventComm::PullSupplier::_duplicate (this->supplier_.in ());
00149
00150
00151
00152 }
00153
00154 CORBA::Any_var any;
00155 try
00156 {
00157 any = supplier->pull ();
00158 }
00159 catch (const CORBA::Exception&)
00160 {
00161
00162
00163 }
00164 return any._retn ();
00165 }
00166
00167 CORBA::Boolean
00168 TAO_CEC_ProxyPullConsumer::supplier_non_existent (
00169 CORBA::Boolean_out disconnected)
00170 {
00171 CORBA::Object_var supplier;
00172 {
00173 ACE_GUARD_THROW_EX (
00174 ACE_Lock, ace_mon, *this->lock_,
00175 CORBA::INTERNAL ());
00176
00177 disconnected = 0;
00178 if (this->is_connected_i () == 0)
00179 {
00180 disconnected = 1;
00181 return 0;
00182 }
00183 if (CORBA::is_nil (this->nopolicy_supplier_.in ()))
00184 {
00185 return 0;
00186 }
00187 supplier = CORBA::Object::_duplicate (this->nopolicy_supplier_.in ());
00188 }
00189
00190 #if (TAO_HAS_MINIMUM_CORBA == 0)
00191 return supplier->_non_existent ();
00192 #else
00193 return 0;
00194 #endif
00195 }
00196
00197 void
00198 TAO_CEC_ProxyPullConsumer::shutdown (void)
00199 {
00200 CosEventComm::PullSupplier_var supplier;
00201
00202 {
00203 ACE_GUARD_THROW_EX (
00204 ACE_Lock, ace_mon, *this->lock_,
00205 CORBA::INTERNAL ());
00206
00207
00208 supplier = this->supplier_._retn ();
00209 }
00210
00211 this->deactivate ();
00212
00213 if (CORBA::is_nil (supplier.in ()))
00214 return;
00215
00216 try
00217 {
00218 supplier->disconnect_pull_supplier ();
00219 }
00220 catch (const CORBA::Exception&)
00221 {
00222
00223
00224 }
00225 }
00226
00227 void
00228 TAO_CEC_ProxyPullConsumer::cleanup_i (void)
00229 {
00230 this->supplier_ =
00231 CosEventComm::PullSupplier::_nil ();
00232 }
00233
00234 CORBA::ULong
00235 TAO_CEC_ProxyPullConsumer::_incr_refcnt (void)
00236 {
00237 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00238 return this->refcount_++;
00239 }
00240
00241 CORBA::ULong
00242 TAO_CEC_ProxyPullConsumer::_decr_refcnt (void)
00243 {
00244 {
00245 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00246 this->refcount_--;
00247 if (this->refcount_ != 0)
00248 return this->refcount_;
00249 }
00250
00251
00252 this->event_channel_->destroy_proxy (this);
00253 return 0;
00254 }
00255
00256 void
00257 TAO_CEC_ProxyPullConsumer::connect_pull_supplier (
00258 CosEventComm::PullSupplier_ptr pull_supplier)
00259 {
00260
00261 if (CORBA::is_nil (pull_supplier))
00262 throw CORBA::BAD_PARAM ();
00263
00264 {
00265 ACE_GUARD_THROW_EX (
00266 ACE_Lock, ace_mon, *this->lock_,
00267 CORBA::INTERNAL ());
00268
00269
00270 if (this->is_connected_i ())
00271 {
00272 if (this->event_channel_->supplier_reconnect () == 0)
00273 throw CosEventChannelAdmin::AlreadyConnected ();
00274
00275
00276
00277 this->cleanup_i ();
00278
00279
00280
00281 TAO_CEC_Unlock reverse_lock (*this->lock_);
00282
00283 {
00284 ACE_GUARD_THROW_EX (
00285 TAO_CEC_Unlock, ace_mon, reverse_lock,
00286 CORBA::INTERNAL ());
00287
00288
00289 this->event_channel_->disconnected (this);
00290 }
00291
00292
00293 if (this->is_connected_i ())
00294 return;
00295 }
00296 this->supplier_ = apply_policy (pull_supplier);
00297 }
00298
00299
00300 this->event_channel_->connected (this);
00301 }
00302
00303 CosEventComm::PullSupplier_ptr
00304 TAO_CEC_ProxyPullConsumer::apply_policy (CosEventComm::PullSupplier_ptr pre)
00305 {
00306 this->nopolicy_supplier_ = CosEventComm::PullSupplier::_duplicate (pre);
00307 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
00308 CosEventComm::PullSupplier_var post =
00309 CosEventComm::PullSupplier::_duplicate (pre);
00310 if (this->timeout_ > ACE_Time_Value::zero)
00311 {
00312 CORBA::PolicyList policy_list;
00313 policy_list.length (1);
00314 policy_list[0] = this->event_channel_->
00315 create_roundtrip_timeout_policy (this->timeout_);
00316
00317 CORBA::Object_var post_obj = pre->_set_policy_overrides
00318 (policy_list, CORBA::ADD_OVERRIDE);
00319 post = CosEventComm::PullSupplier::_narrow (post_obj.in ());
00320
00321 policy_list[0]->destroy ();
00322 policy_list.length (0);
00323 }
00324 return post._retn ();
00325 #else
00326 return CosEventComm::PullSupplier::_duplicate (pre);
00327 #endif
00328 }
00329
00330 void
00331 TAO_CEC_ProxyPullConsumer::disconnect_pull_consumer ()
00332 {
00333 CosEventComm::PullSupplier_var supplier;
00334
00335 {
00336 ACE_GUARD_THROW_EX (
00337 ACE_Lock, ace_mon, *this->lock_,
00338 CORBA::INTERNAL ());
00339
00340
00341 if (this->is_connected_i () == 0)
00342 throw CORBA::BAD_INV_ORDER ();
00343
00344 supplier = this->supplier_._retn ();
00345
00346 this->cleanup_i ();
00347 }
00348
00349
00350 this->event_channel_->disconnected (this);
00351
00352 if (this->event_channel_->disconnect_callbacks ())
00353 {
00354 try
00355 {
00356 supplier->disconnect_pull_supplier ();
00357 }
00358 catch (const CORBA::Exception&)
00359 {
00360
00361
00362 }
00363 }
00364 }
00365
00366 PortableServer::POA_ptr
00367 TAO_CEC_ProxyPullConsumer::_default_POA (void)
00368 {
00369 return PortableServer::POA::_duplicate (this->default_POA_.in ());
00370 }
00371
00372 void
00373 TAO_CEC_ProxyPullConsumer::_add_ref (void)
00374 {
00375 this->_incr_refcnt ();
00376 }
00377
00378 void
00379 TAO_CEC_ProxyPullConsumer::_remove_ref (void)
00380 {
00381 this->_decr_refcnt ();
00382 }
00383
00384 TAO_END_VERSIONED_NAMESPACE_DECL