00001
00002
00003 #include "orbsvcs/Event/EC_ProxySupplier.h"
00004 #include "orbsvcs/Event/EC_Dispatching.h"
00005 #include "orbsvcs/Event/EC_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_QOS_Info.h"
00007 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00008 #include "orbsvcs/Event/EC_Scheduling_Strategy.h"
00009 #include "orbsvcs/Event/EC_ConsumerControl.h"
00010 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00011 #include "orbsvcs/ESF/ESF_RefCount_Guard.h"
00012 #include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
00013
00014 #include "ace/Reverse_Lock_T.h"
00015
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/Event/EC_ProxySupplier.i"
00018 #endif
00019
00020 ACE_RCSID (Event,
00021 EC_ProxySupplier,
00022 "EC_ProxySupplier.cpp,v 1.69 2006/03/14 06:14:25 jtc Exp")
00023
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00027
00028 TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* ec, int validate_connection)
00029 : event_channel_ (ec),
00030 refcount_ (1),
00031 suspended_ (0),
00032 child_ (0),
00033 consumer_validate_connection_(validate_connection)
00034 {
00035 this->lock_ =
00036 this->event_channel_->create_supplier_lock ();
00037
00038 this->default_POA_ =
00039 this->event_channel_->supplier_poa ();
00040 }
00041
00042 TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
00043 {
00044 this->event_channel_->destroy_supplier_lock (this->lock_);
00045 this->cleanup_i ();
00046 }
00047
00048 void
00049 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer* consumer
00050 ACE_ENV_ARG_DECL)
00051 {
00052 TAO_EC_Scheduling_Strategy *s =
00053 this->event_channel_->scheduling_strategy ();
00054
00055 s->add_proxy_supplier_dependencies (this,
00056 consumer
00057 ACE_ENV_ARG_PARAMETER);
00058 }
00059
00060 void
00061 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushConsumer* consumer
00062 ACE_ENV_ARG_DECL)
00063 {
00064 TAO_EC_Scheduling_Strategy *s =
00065 this->event_channel_->scheduling_strategy ();
00066
00067 s->add_proxy_supplier_dependencies (this,
00068 consumer
00069 ACE_ENV_ARG_PARAMETER);
00070 }
00071
00072 void
00073 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*
00074 ACE_ENV_ARG_DECL_NOT_USED)
00075 {
00076 }
00077
00078 void
00079 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*
00080 ACE_ENV_ARG_DECL_NOT_USED)
00081 {
00082 }
00083
00084 void
00085 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushSupplier*
00086 ACE_ENV_ARG_DECL_NOT_USED)
00087 {
00088 }
00089
00090 void
00091 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*
00092 ACE_ENV_ARG_DECL_NOT_USED)
00093 {
00094 }
00095
00096 void
00097 TAO_EC_ProxyPushSupplier::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00098 {
00099
00100
00101 RtecEventComm::PushConsumer_var consumer;
00102
00103 {
00104 ACE_GUARD_THROW_EX (
00105 ACE_Lock, ace_mon, *this->lock_,
00106 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00107 ACE_CHECK;
00108
00109 int connected = this->is_connected_i ();
00110
00111 consumer = this->consumer_._retn ();
00112
00113 if (connected)
00114 this->cleanup_i ();
00115 }
00116
00117 this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
00118 ACE_CHECK;
00119
00120 if (CORBA::is_nil (consumer.in ()))
00121 return;
00122
00123 ACE_TRY
00124 {
00125 consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
00126 ACE_TRY_CHECK;
00127 }
00128 ACE_CATCHANY
00129 {
00130
00131
00132 }
00133 ACE_ENDTRY;
00134 }
00135
00136 void
00137 TAO_EC_ProxyPushSupplier::cleanup_i (void)
00138 {
00139 this->consumer_ =
00140 RtecEventComm::PushConsumer::_nil ();
00141
00142
00143
00144 delete this->child_;
00145 this->child_ = 0;
00146 }
00147
00148 void
00149 TAO_EC_ProxyPushSupplier::deactivate (ACE_ENV_SINGLE_ARG_DECL) ACE_THROW_SPEC (())
00150 {
00151 ACE_TRY
00152 {
00153 PortableServer::ObjectId id =
00154 this->object_id (ACE_ENV_SINGLE_ARG_PARAMETER);
00155 ACE_TRY_CHECK;
00156 this->default_POA_->deactivate_object (id ACE_ENV_ARG_PARAMETER);
00157 ACE_TRY_CHECK;
00158 }
00159 ACE_CATCHANY
00160 {
00161
00162
00163
00164
00165 }
00166 ACE_ENDTRY;
00167 }
00168
00169 CORBA::ULong
00170 TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
00171 {
00172 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00173 return this->refcount_++;
00174 }
00175
00176 void
00177 TAO_EC_ProxyPushSupplier::refcount_zero_hook (void)
00178 {
00179
00180 this->event_channel_->destroy_proxy (this);
00181 }
00182
00183 CORBA::ULong
00184 TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
00185 {
00186 {
00187 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00188 this->refcount_--;
00189 if (this->refcount_ != 0)
00190 return this->refcount_;
00191 }
00192
00193 this->refcount_zero_hook ();
00194 return 0;
00195 }
00196
00197 typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel_Base,TAO_EC_ProxyPushSupplier> Destroy_Guard;
00198
00199 int
00200 TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
00201 TAO_EC_QOS_Info& qos_info
00202 ACE_ENV_ARG_DECL)
00203 {
00204 Destroy_Guard auto_destroy (this->refcount_,
00205 this->event_channel_,
00206 this);
00207
00208 int result = 0;
00209 {
00210 ACE_GUARD_THROW_EX (
00211 ACE_Lock, ace_mon, *this->lock_,
00212 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00213 ACE_CHECK_RETURN (0);
00214
00215 if (this->is_connected_i () == 0)
00216 return 0;
00217
00218 result =
00219 this->child_->filter (event, qos_info ACE_ENV_ARG_PARAMETER);
00220 ACE_CHECK_RETURN (0);
00221 }
00222 return result;
00223 }
00224
00225 int
00226 TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
00227 TAO_EC_QOS_Info& qos_info
00228 ACE_ENV_ARG_DECL)
00229 {
00230 Destroy_Guard auto_destroy (this->refcount_,
00231 this->event_channel_,
00232 this);
00233
00234 int result = 0;
00235 {
00236 ACE_GUARD_THROW_EX (
00237 ACE_Lock, ace_mon, *this->lock_,
00238 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00239 ACE_CHECK_RETURN (0);
00240
00241 if (this->is_connected_i () == 0)
00242 return 0;
00243
00244 result =
00245 this->child_->filter_nocopy (event, qos_info ACE_ENV_ARG_PARAMETER);
00246 ACE_CHECK_RETURN (0);
00247 }
00248 return result;
00249 }
00250
00251 void
00252 TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
00253 TAO_EC_QOS_Info& qos_info
00254 ACE_ENV_ARG_DECL)
00255 {
00256
00257
00258 if (this->is_connected_i () == 0)
00259 return;
00260
00261 if (this->suspended_ != 0)
00262 return;
00263
00264 TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 RtecEventComm::PushConsumer_var consumer =
00276 RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00277
00278 this->pre_dispatch_hook (const_cast<RtecEventComm::EventSet&> (event)
00279 ACE_ENV_ARG_PARAMETER);
00280 ACE_CHECK;
00281
00282 {
00283
00284 TAO_EC_Unlock reverse_lock (*this->lock_);
00285
00286 ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00287 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00288 ACE_CHECK;
00289
00290 this->event_channel_->dispatching ()->push (this,
00291 consumer.in (),
00292 event,
00293 qos_info
00294 ACE_ENV_ARG_PARAMETER);
00295 ACE_CHECK;
00296 }
00297
00298 if (this->child_ != 0)
00299 this->child_->clear ();
00300 }
00301
00302 void
00303 TAO_EC_ProxyPushSupplier::pre_dispatch_hook (RtecEventComm::EventSet&
00304 ACE_ENV_ARG_DECL_NOT_USED)
00305 {
00306 }
00307
00308 void
00309 TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
00310 TAO_EC_QOS_Info& qos_info
00311 ACE_ENV_ARG_DECL)
00312 {
00313
00314
00315 if (this->is_connected_i () == 0)
00316 return;
00317
00318 if (this->suspended_ != 0)
00319 return;
00320
00321 TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332 RtecEventComm::PushConsumer_var consumer =
00333 RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00334
00335 this->pre_dispatch_hook (event ACE_ENV_ARG_PARAMETER);
00336 ACE_CHECK;
00337
00338 {
00339 TAO_EC_Unlock reverse_lock (*this->lock_);
00340
00341 ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00342 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00343 ACE_CHECK;
00344
00345 this->event_channel_->dispatching ()->push_nocopy (this,
00346 consumer.in (),
00347 event,
00348 qos_info
00349 ACE_ENV_ARG_PARAMETER);
00350 ACE_CHECK;
00351 }
00352
00353 if (this->child_ != 0)
00354 this->child_->clear ();
00355 }
00356
00357 void
00358 TAO_EC_ProxyPushSupplier::push_to_consumer (
00359 RtecEventComm::PushConsumer_ptr consumer,
00360 const RtecEventComm::EventSet& event
00361 ACE_ENV_ARG_DECL)
00362 {
00363 {
00364 ACE_GUARD_THROW_EX (
00365 ACE_Lock, ace_mon, *this->lock_,
00366 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00367 ACE_CHECK;
00368
00369 if (this->is_connected_i () == 0)
00370 return;
00371
00372 if (this->suspended_ != 0)
00373 return;
00374 }
00375
00376 ACE_TRY
00377 {
00378 consumer->push (event ACE_ENV_ARG_PARAMETER);
00379 ACE_TRY_CHECK;
00380 }
00381 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
00382 {
00383
00384
00385
00386
00387
00388
00389
00390 if (consumer == this->consumer_.in ())
00391 {
00392 TAO_EC_ConsumerControl *control =
00393 this->event_channel_->consumer_control ();
00394
00395 control->consumer_not_exist (this ACE_ENV_ARG_PARAMETER);
00396 ACE_TRY_CHECK;
00397 }
00398 }
00399 ACE_CATCH (CORBA::SystemException, sysex)
00400 {
00401
00402
00403
00404
00405
00406
00407
00408 if (consumer == this->consumer_.in ())
00409 {
00410 TAO_EC_ConsumerControl *control =
00411 this->event_channel_->consumer_control ();
00412
00413 control->system_exception (this,
00414 sysex
00415 ACE_ENV_ARG_PARAMETER);
00416 ACE_TRY_CHECK;
00417 }
00418 }
00419 ACE_CATCHANY
00420 {
00421
00422 }
00423 ACE_ENDTRY;
00424 }
00425
00426 void
00427 TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
00428 RtecEventComm::PushConsumer_ptr consumer,
00429 const RtecEventComm::EventSet& event
00430 ACE_ENV_ARG_DECL)
00431 {
00432 ACE_TRY
00433 {
00434 consumer->push (event ACE_ENV_ARG_PARAMETER);
00435 ACE_TRY_CHECK;
00436 }
00437 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
00438 {
00439 TAO_EC_ConsumerControl *control =
00440 this->event_channel_->consumer_control ();
00441
00442 control->consumer_not_exist (this ACE_ENV_ARG_PARAMETER);
00443 ACE_CHECK;
00444 }
00445 ACE_CATCH (CORBA::SystemException, sysex)
00446 {
00447 TAO_EC_ConsumerControl *control =
00448 this->event_channel_->consumer_control ();
00449
00450 control->system_exception (this,
00451 sysex
00452 ACE_ENV_ARG_PARAMETER);
00453 ACE_CHECK;
00454 }
00455 ACE_CATCHANY
00456 {
00457
00458 }
00459 ACE_ENDTRY;
00460 }
00461
00462 CORBA::Boolean
00463 TAO_EC_ProxyPushSupplier::consumer_non_existent (
00464 CORBA::Boolean_out disconnected
00465 ACE_ENV_ARG_DECL)
00466 {
00467 CORBA::Object_var consumer;
00468 {
00469 ACE_GUARD_THROW_EX (
00470 ACE_Lock, ace_mon, *this->lock_,
00471 CORBA::INTERNAL ());
00472 ACE_CHECK_RETURN (0);
00473
00474 disconnected = 0;
00475 if (this->is_connected_i () == 0)
00476 {
00477 disconnected = 1;
00478 return 0;
00479 }
00480
00481 consumer = CORBA::Object::_duplicate (this->consumer_.in ());
00482 }
00483
00484 #if (TAO_HAS_MINIMUM_CORBA == 0)
00485 return consumer->_non_existent (ACE_ENV_SINGLE_ARG_PARAMETER);
00486 #else
00487 return 0;
00488 #endif
00489 }
00490
00491 void
00492 TAO_EC_ProxyPushSupplier::clear (void)
00493 {
00494 ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
00495
00496 this->child_->clear ();
00497 }
00498
00499 CORBA::ULong
00500 TAO_EC_ProxyPushSupplier::max_event_size (void) const
00501 {
00502 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00503
00504 return this->child_->max_event_size ();
00505 }
00506
00507 int
00508 TAO_EC_ProxyPushSupplier::can_match (
00509 const RtecEventComm::EventHeader &header) const
00510 {
00511 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00512
00513 if (this->is_connected_i () == 0)
00514 return 0;
00515
00516 return this->child_->can_match (header);
00517 }
00518
00519 int
00520 TAO_EC_ProxyPushSupplier::add_dependencies (
00521 const RtecEventComm::EventHeader &header,
00522 const TAO_EC_QOS_Info &qos_info
00523 ACE_ENV_ARG_DECL)
00524 {
00525 ACE_GUARD_THROW_EX (
00526 ACE_Lock, ace_mon, *this->lock_,
00527 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00528 ACE_CHECK_RETURN (0);
00529
00530 return this->child_->add_dependencies (header,
00531 qos_info
00532 ACE_ENV_ARG_PARAMETER);
00533 }
00534
00535 TAO_END_VERSIONED_NAMESPACE_DECL