00001
00002
00003 #include "orbsvcs/Event/EC_ProxySupplier.h"
00004 #include "orbsvcs/Event/EC_Dispatching.h"
00005 #include "orbsvcs/Event/EC_Filter_Builder.h"
00006 #include "orbsvcs/Event/EC_QOS_Info.h"
00007 #include "orbsvcs/Event/EC_Event_Channel_Base.h"
00008 #include "orbsvcs/Event/EC_Scheduling_Strategy.h"
00009 #include "orbsvcs/Event/EC_ConsumerControl.h"
00010 #include "orbsvcs/Event/EC_SupplierAdmin.h"
00011 #include "orbsvcs/ESF/ESF_RefCount_Guard.h"
00012 #include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
00013
00014 #include "ace/Reverse_Lock_T.h"
00015
00016 #if ! defined (__ACE_INLINE__)
00017 #include "orbsvcs/Event/EC_ProxySupplier.inl"
00018 #endif
00019
00020 ACE_RCSID (Event,
00021 EC_ProxySupplier,
00022 "$Id: EC_ProxySupplier.cpp 77052 2007-02-12 18:59:05Z johnnyw $")
00023
00024 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
00027
00028 TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* ec, int validate_connection)
00029 : event_channel_ (ec),
00030 refcount_ (1),
00031 suspended_ (0),
00032 child_ (0),
00033 consumer_validate_connection_(validate_connection)
00034 {
00035 this->lock_ =
00036 this->event_channel_->create_supplier_lock ();
00037
00038 this->default_POA_ =
00039 this->event_channel_->supplier_poa ();
00040 }
00041
00042 TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
00043 {
00044 this->event_channel_->destroy_supplier_lock (this->lock_);
00045 this->cleanup_i ();
00046 }
00047
00048 void
00049 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer* consumer)
00050 {
00051 TAO_EC_Scheduling_Strategy *s =
00052 this->event_channel_->scheduling_strategy ();
00053
00054 s->add_proxy_supplier_dependencies (this,
00055 consumer);
00056 }
00057
00058 void
00059 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushConsumer* consumer)
00060 {
00061 TAO_EC_Scheduling_Strategy *s =
00062 this->event_channel_->scheduling_strategy ();
00063
00064 s->add_proxy_supplier_dependencies (this,
00065 consumer);
00066 }
00067
00068 void
00069 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*)
00070 {
00071 }
00072
00073 void
00074 TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*)
00075 {
00076 }
00077
00078 void
00079 TAO_EC_ProxyPushSupplier::reconnected (TAO_EC_ProxyPushSupplier*)
00080 {
00081 }
00082
00083 void
00084 TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*)
00085 {
00086 }
00087
00088 void
00089 TAO_EC_ProxyPushSupplier::shutdown (void)
00090 {
00091
00092
00093 RtecEventComm::PushConsumer_var consumer;
00094
00095 {
00096 ACE_GUARD_THROW_EX (
00097 ACE_Lock, ace_mon, *this->lock_,
00098 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00099
00100 int connected = this->is_connected_i ();
00101
00102 consumer = this->consumer_._retn ();
00103
00104 if (connected)
00105 this->cleanup_i ();
00106 }
00107
00108 this->deactivate ();
00109
00110 if (CORBA::is_nil (consumer.in ()))
00111 return;
00112
00113 try
00114 {
00115 consumer->disconnect_push_consumer ();
00116 }
00117 catch (const CORBA::Exception&)
00118 {
00119
00120
00121 }
00122 }
00123
00124 void
00125 TAO_EC_ProxyPushSupplier::cleanup_i (void)
00126 {
00127 this->consumer_ =
00128 RtecEventComm::PushConsumer::_nil ();
00129
00130
00131
00132 delete this->child_;
00133 this->child_ = 0;
00134 }
00135
00136 void
00137 TAO_EC_ProxyPushSupplier::deactivate (void) throw ()
00138 {
00139 try
00140 {
00141 PortableServer::ObjectId id = this->object_id ();
00142 this->default_POA_->deactivate_object (id);
00143 }
00144 catch (const CORBA::Exception&)
00145 {
00146
00147
00148
00149
00150 }
00151 }
00152
00153 CORBA::ULong
00154 TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
00155 {
00156 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00157 return this->refcount_++;
00158 }
00159
00160 void
00161 TAO_EC_ProxyPushSupplier::refcount_zero_hook (void)
00162 {
00163
00164 this->event_channel_->destroy_proxy (this);
00165 }
00166
00167 CORBA::ULong
00168 TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
00169 {
00170 {
00171 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00172 --this->refcount_;
00173 if (this->refcount_ != 0)
00174 return this->refcount_;
00175 }
00176
00177 this->refcount_zero_hook ();
00178 return 0;
00179 }
00180
00181 typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel_Base,TAO_EC_ProxyPushSupplier> Destroy_Guard;
00182
00183 int
00184 TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
00185 TAO_EC_QOS_Info& qos_info)
00186 {
00187 Destroy_Guard auto_destroy (this->refcount_,
00188 this->event_channel_,
00189 this);
00190
00191 int result = 0;
00192 {
00193 ACE_GUARD_THROW_EX (
00194 ACE_Lock, ace_mon, *this->lock_,
00195 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00196
00197 if (this->is_connected_i () == 0)
00198 return 0;
00199
00200 result = this->child_->filter (event, qos_info);
00201 }
00202 return result;
00203 }
00204
00205 int
00206 TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
00207 TAO_EC_QOS_Info& qos_info)
00208 {
00209 Destroy_Guard auto_destroy (this->refcount_,
00210 this->event_channel_,
00211 this);
00212
00213 int result = 0;
00214 {
00215 ACE_GUARD_THROW_EX (
00216 ACE_Lock, ace_mon, *this->lock_,
00217 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00218
00219 if (this->is_connected_i () == 0)
00220 return 0;
00221
00222 result = this->child_->filter_nocopy (event, qos_info);
00223 }
00224 return result;
00225 }
00226
00227 void
00228 TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
00229 TAO_EC_QOS_Info& qos_info)
00230 {
00231
00232
00233 if (this->is_connected_i () == 0)
00234 return;
00235
00236 if (this->suspended_ != 0)
00237 return;
00238
00239 TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 RtecEventComm::PushConsumer_var consumer =
00251 RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00252
00253 this->pre_dispatch_hook (const_cast<RtecEventComm::EventSet&> (event));
00254
00255 {
00256
00257 TAO_EC_Unlock reverse_lock (*this->lock_);
00258
00259 ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00260 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00261
00262 this->event_channel_->dispatching ()->push (this,
00263 consumer.in (),
00264 event,
00265 qos_info);
00266 }
00267
00268 if (this->child_ != 0)
00269 this->child_->clear ();
00270 }
00271
00272 void
00273 TAO_EC_ProxyPushSupplier::pre_dispatch_hook (RtecEventComm::EventSet&)
00274 {
00275 }
00276
00277 void
00278 TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
00279 TAO_EC_QOS_Info& qos_info)
00280 {
00281
00282
00283 if (this->is_connected_i () == 0)
00284 return;
00285
00286 if (this->suspended_ != 0)
00287 return;
00288
00289 TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 RtecEventComm::PushConsumer_var consumer =
00301 RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
00302
00303 this->pre_dispatch_hook (event);
00304
00305 {
00306 TAO_EC_Unlock reverse_lock (*this->lock_);
00307
00308 ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
00309 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00310
00311 this->event_channel_->dispatching ()->push_nocopy (this,
00312 consumer.in (),
00313 event,
00314 qos_info);
00315 }
00316
00317 if (this->child_ != 0)
00318 this->child_->clear ();
00319 }
00320
00321 void
00322 TAO_EC_ProxyPushSupplier::push_to_consumer (
00323 RtecEventComm::PushConsumer_ptr consumer,
00324 const RtecEventComm::EventSet& event)
00325 {
00326 {
00327 ACE_GUARD_THROW_EX (
00328 ACE_Lock, ace_mon, *this->lock_,
00329 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00330
00331 if (this->is_connected_i () == 0)
00332 return;
00333
00334 if (this->suspended_ != 0)
00335 return;
00336 }
00337
00338 try
00339 {
00340 consumer->push (event);
00341 }
00342 catch (const CORBA::OBJECT_NOT_EXIST&)
00343 {
00344
00345
00346
00347
00348
00349
00350
00351 if (consumer == this->consumer_.in ())
00352 {
00353 TAO_EC_ConsumerControl *control =
00354 this->event_channel_->consumer_control ();
00355
00356 control->consumer_not_exist (this);
00357 }
00358 }
00359 catch (CORBA::SystemException& sysex)
00360 {
00361
00362
00363
00364
00365
00366
00367
00368 if (consumer == this->consumer_.in ())
00369 {
00370 TAO_EC_ConsumerControl *control =
00371 this->event_channel_->consumer_control ();
00372
00373 control->system_exception (this,
00374 sysex);
00375 }
00376 }
00377 catch (const CORBA::Exception&)
00378 {
00379
00380 }
00381 }
00382
00383 void
00384 TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
00385 RtecEventComm::PushConsumer_ptr consumer,
00386 const RtecEventComm::EventSet& event)
00387 {
00388 try
00389 {
00390 consumer->push (event);
00391 }
00392 catch (const CORBA::OBJECT_NOT_EXIST&)
00393 {
00394 TAO_EC_ConsumerControl *control =
00395 this->event_channel_->consumer_control ();
00396
00397 control->consumer_not_exist (this);
00398 }
00399 catch (CORBA::SystemException& sysex)
00400 {
00401 TAO_EC_ConsumerControl *control =
00402 this->event_channel_->consumer_control ();
00403
00404 control->system_exception (this,
00405 sysex);
00406 }
00407 catch (const CORBA::Exception&)
00408 {
00409
00410 }
00411 }
00412
00413 CORBA::Boolean
00414 TAO_EC_ProxyPushSupplier::consumer_non_existent (
00415 CORBA::Boolean_out disconnected)
00416 {
00417 CORBA::Object_var consumer;
00418 {
00419 ACE_GUARD_THROW_EX (
00420 ACE_Lock, ace_mon, *this->lock_,
00421 CORBA::INTERNAL ());
00422
00423 disconnected = 0;
00424 if (this->is_connected_i () == 0)
00425 {
00426 disconnected = 1;
00427 return 0;
00428 }
00429
00430 consumer = CORBA::Object::_duplicate (this->consumer_.in ());
00431 }
00432
00433 #if (TAO_HAS_MINIMUM_CORBA == 0)
00434 return consumer->_non_existent ();
00435 #else
00436 return 0;
00437 #endif
00438 }
00439
00440 void
00441 TAO_EC_ProxyPushSupplier::clear (void)
00442 {
00443 ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
00444
00445 this->child_->clear ();
00446 }
00447
00448 CORBA::ULong
00449 TAO_EC_ProxyPushSupplier::max_event_size (void) const
00450 {
00451 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00452
00453 return this->child_->max_event_size ();
00454 }
00455
00456 int
00457 TAO_EC_ProxyPushSupplier::can_match (
00458 const RtecEventComm::EventHeader &header) const
00459 {
00460 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
00461
00462 if (this->is_connected_i () == 0)
00463 return 0;
00464
00465 return this->child_->can_match (header);
00466 }
00467
00468 int
00469 TAO_EC_ProxyPushSupplier::add_dependencies (
00470 const RtecEventComm::EventHeader &header,
00471 const TAO_EC_QOS_Info &qos_info)
00472 {
00473 ACE_GUARD_THROW_EX (
00474 ACE_Lock, ace_mon, *this->lock_,
00475 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
00476
00477 return this->child_->add_dependencies (header,
00478 qos_info);
00479 }
00480
00481 TAO_END_VERSIONED_NAMESPACE_DECL