00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Defaults.h"
00005 #include "orbsvcs/Event_Utilities.h"
00006 #include "orbsvcs/Time_Utilities.h"
00007
00008 #include "orbsvcs/Event/EC_Gateway_IIOP_Factory.h"
00009 #include "orbsvcs/Event/ECG_ConsumerEC_Control.h"
00010
00011 #include "ace/Dynamic_Service.h"
00012
00013 ACE_RCSID (Event,
00014 EC_Gateway_IIOP,
00015 "$Id: EC_Gateway_IIOP.cpp 81419 2008-04-24 11:35:22Z johnnyw $")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
00020 : busy_count_ (0),
00021 update_posted_ (0),
00022 cleanup_posted_ (0),
00023 supplier_ec_suspended_ (0),
00024 supplier_info_ (0),
00025 consumer_info_ (0),
00026 consumer_ (this),
00027 consumer_is_active_ (false),
00028 supplier_ (this),
00029 supplier_is_active_ (false),
00030 ec_control_ (0),
00031 factory_ (0),
00032 use_ttl_ (1),
00033 use_consumer_proxy_map_ (1)
00034 {
00035 if (this->factory_ == 0)
00036 {
00037 this->factory_ =
00038 ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
00039
00040 if (this->factory_ == 0)
00041 {
00042 TAO_EC_Gateway_IIOP_Factory *f = 0;
00043 ACE_NEW (f,
00044 TAO_EC_Gateway_IIOP_Factory);
00045 this->factory_ = f;
00046 }
00047 }
00048
00049 if (this->factory_ != 0)
00050 {
00051 this->use_ttl_ = this->factory_->use_ttl();
00052 this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
00053 }
00054 }
00055
00056 TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
00057 {
00058 delete ec_control_;
00059 ec_control_ = 0;
00060 }
00061
00062 int
00063 TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00064 RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
00065 {
00066 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00067
00068 return this->init_i (supplier_ec, consumer_ec);
00069 }
00070
00071 int
00072 TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00073 RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
00074 {
00075 if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
00076 {
00077 this->supplier_ec_ =
00078 RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
00079 this->consumer_ec_ =
00080 RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
00081
00082 if (ec_control_ == 0)
00083 {
00084 ec_control_ = factory_->create_consumerec_control(this);
00085 ec_control_->activate();
00086 }
00087
00088 return 0;
00089 }
00090 else
00091 ACE_ERROR_RETURN ((LM_ERROR,
00092 "TAO_EC_Gateway_IIOP - init_i "
00093 "Supplier and consumer event channel reference "
00094 "should be nil.\n"), -1);
00095 }
00096
00097 void
00098 TAO_EC_Gateway_IIOP::close (void)
00099 {
00100 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00101
00102 this->close_i ();
00103 }
00104
00105 void
00106 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies (void)
00107 {
00108 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00109
00110
00111 if (this->busy_count_ != 0)
00112 {
00113 this->cleanup_posted_ = 1;
00114 return;
00115 }
00116
00117 this->cleanup_consumer_proxies_i ();
00118 }
00119
00120 void
00121 TAO_EC_Gateway_IIOP::close_i (void)
00122 {
00123
00124 this->disconnect_consumer_proxies_i ();
00125
00126 this->disconnect_supplier_proxy_i ();
00127 }
00128
00129 void
00130 TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i (void)
00131 {
00132 if (this->consumer_proxy_map_.current_size () > 0)
00133 {
00134 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00135 j != this->consumer_proxy_map_.end ();
00136 ++j)
00137 {
00138 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00139 if (CORBA::is_nil (consumer))
00140 continue;
00141 try
00142 {
00143 consumer->disconnect_push_consumer ();
00144 }
00145 catch (const CORBA::Exception&)
00146 {
00147 }
00148 CORBA::release (consumer);
00149 }
00150
00151
00152 this->consumer_proxy_map_.open ();
00153 }
00154
00155 if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00156 {
00157 this->default_consumer_proxy_->disconnect_push_consumer ();
00158
00159 this->default_consumer_proxy_ =
00160 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00161 }
00162 }
00163
00164 void
00165 TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i (void)
00166 {
00167 if (!CORBA::is_nil (this->supplier_proxy_.in ()))
00168 {
00169 this->supplier_proxy_->disconnect_push_supplier ();
00170
00171 this->supplier_proxy_ =
00172 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00173 }
00174 }
00175
00176 void
00177 TAO_EC_Gateway_IIOP::reconnect_consumer_ec(void)
00178 {
00179 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00180
00181 if (this->busy_count_ != 0)
00182 {
00183 this->update_posted_ = 1;
00184 return;
00185 }
00186
00187 this->update_consumer_i (c_qos_);
00188 }
00189
00190 void
00191 TAO_EC_Gateway_IIOP::update_consumer (
00192 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00193 {
00194 if (c_qos.dependencies.length () == 0)
00195 return;
00196
00197 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00198
00199 this->c_qos_ = c_qos;
00200
00201 if (this->busy_count_ != 0)
00202 {
00203 this->update_posted_ = 1;
00204 return;
00205 }
00206
00207 this->update_consumer_i (c_qos);
00208 }
00209
00210 void
00211 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i (void)
00212 {
00213 if (this->consumer_proxy_map_.current_size () > 0)
00214 {
00215 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00216 j != this->consumer_proxy_map_.end ();
00217 ++j)
00218 {
00219 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00220 if (CORBA::is_nil (consumer))
00221 continue;
00222
00223 CORBA::release (consumer);
00224 }
00225
00226
00227 this->consumer_proxy_map_.open ();
00228 }
00229
00230 if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00231 {
00232 this->default_consumer_proxy_ =
00233 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00234 }
00235 }
00236
00237 void
00238 TAO_EC_Gateway_IIOP::update_consumer_i (
00239 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00240 {
00241 this->close_i ();
00242
00243 if (CORBA::is_nil (this->consumer_ec_.in ())
00244 || CORBA::is_nil (this->supplier_ec_.in ()))
00245 return;
00246
00247
00248
00249 this->open_i (c_qos);
00250 }
00251
00252 void
00253 TAO_EC_Gateway_IIOP::open_i (
00254 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00255 {
00256
00257 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00258 this->consumer_ec_->for_suppliers ();
00259
00260 RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
00261 sub.is_gateway = 1;
00262
00263
00264
00265
00266 for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
00267 {
00268 sub.dependencies[i].rt_info = this->supplier_info_;
00269
00270 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00271 const RtecEventComm::EventHeader &h =
00272 sub.dependencies[i].event.header;
00273
00274 RtecEventComm::EventSourceID sid = h.source;
00275
00276
00277
00278
00279
00280
00281
00282
00283 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
00284 continue;
00285
00286
00287 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00288 continue;
00289
00290 if (this->consumer_proxy_map_.find (sid, proxy) == -1)
00291 {
00292
00293
00294
00295 proxy = supplier_admin->obtain_push_consumer ();
00296 this->consumer_proxy_map_.bind (sid, proxy);
00297 }
00298 }
00299
00300
00301
00302
00303 if (this->consumer_proxy_map_.current_size () > 0)
00304 {
00305 this->supplier_is_active_ = true;
00306
00307
00308 RtecEventComm::PushSupplier_var supplier_ref = this->supplier_._this ();
00309
00310
00311
00312
00313 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00314 j != this->consumer_proxy_map_.end ();
00315 ++j)
00316 {
00317 RtecEventChannelAdmin::SupplierQOS pub;
00318 pub.publications.length (sub.dependencies.length () + 1);
00319 pub.is_gateway = 1;
00320
00321 int c = 0;
00322
00323 RtecEventComm::EventSourceID sid = (*j).ext_id_;
00324 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00325 {
00326 const RtecEventComm::EventHeader& h =
00327 sub.dependencies[k].event.header;
00328 if (h.source != sid
00329 || (ACE_ES_EVENT_ANY < h.type
00330 && h.type < ACE_ES_EVENT_UNDEFINED))
00331 continue;
00332 pub.publications[c].event.header = h;
00333 pub.publications[c].dependency_info.dependency_type =
00334 RtecBase::TWO_WAY_CALL;
00335 pub.publications[c].dependency_info.number_of_calls = 1;
00336 pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00337 c++;
00338 }
00339
00340
00341
00342 if (c == 0)
00343 continue;
00344
00345 pub.publications.length (c);
00346
00347
00348
00349 (*j).int_id_->connect_push_supplier (supplier_ref.in (),
00350 pub);
00351 }
00352 }
00353
00354
00355
00356
00357 RtecEventChannelAdmin::SupplierQOS pub;
00358 pub.publications.length (sub.dependencies.length () + 1);
00359 pub.is_gateway = 1;
00360 int c = 0;
00361 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00362 {
00363 const RtecEventComm::EventHeader& h =
00364 sub.dependencies[k].event.header;
00365 RtecEventComm::EventSourceID sid = h.source;
00366
00367
00368 if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
00369 continue;
00370
00371
00372 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00373 continue;
00374
00375 pub.publications[c].event.header = h;
00376 pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
00377 pub.publications[c].dependency_info.dependency_type =
00378 RtecBase::TWO_WAY_CALL;
00379 pub.publications[c].dependency_info.number_of_calls = 1;
00380 pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00381 c++;
00382 }
00383
00384 if (c > 0)
00385 {
00386 this->supplier_is_active_ = true;
00387
00388
00389 RtecEventComm::PushSupplier_var supplier_ref =
00390 this->supplier_._this ();
00391
00392
00393 this->default_consumer_proxy_ =
00394 supplier_admin->obtain_push_consumer ();
00395
00396 pub.publications.length (c);
00397
00398
00399 this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00400 pub);
00401 }
00402
00403 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00404 this->supplier_ec_->for_consumers ();
00405
00406 this->supplier_proxy_ =
00407 consumer_admin->obtain_push_supplier ();
00408
00409 this->consumer_is_active_ = true;
00410 RtecEventComm::PushConsumer_var consumer_ref =
00411 this->consumer_._this ();
00412
00413
00414
00415
00416 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00417 sub);
00418 }
00419
00420 void
00421 TAO_EC_Gateway_IIOP::update_supplier (
00422 const RtecEventChannelAdmin::SupplierQOS&)
00423 {
00424
00425 }
00426
00427 void
00428 TAO_EC_Gateway_IIOP::disconnect_push_consumer (void)
00429 {
00430
00431
00432
00433 }
00434
00435 void
00436 TAO_EC_Gateway_IIOP::disconnect_push_supplier (void)
00437 {
00438
00439
00440
00441 }
00442
00443 void
00444 TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events)
00445 {
00446
00447
00448 if (events.length () == 0)
00449 {
00450
00451 return;
00452 }
00453
00454 {
00455 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00456
00457 this->busy_count_++;
00458 }
00459
00460
00461
00462
00463
00464 RtecEventComm::EventSet out (1);
00465 out.length (1);
00466 for (CORBA::ULong i = 0; i < events.length (); ++i)
00467 {
00468 if (this->use_ttl_ == 1)
00469 {
00470 if (events[i].header.ttl == 0)
00471 continue;
00472 }
00473
00474 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00475 RtecEventComm::EventSourceID sid = events[i].header.source;
00476 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
00477 || this->consumer_proxy_map_.find (sid, proxy) == -1)
00478 {
00479
00480
00481 proxy = this->default_consumer_proxy_.in ();
00482 }
00483
00484 if (CORBA::is_nil (proxy))
00485 continue;
00486
00487 out[0] = events[i];
00488
00489 if (this->use_ttl_ == 1)
00490 out[0].header.ttl--;
00491
00492
00493 this->push_to_consumer(proxy, out);
00494 }
00495
00496 {
00497 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00498
00499 this->busy_count_--;
00500
00501 if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
00502 {
00503 this->cleanup_posted_ = 0;
00504 this->cleanup_consumer_proxies_i ();
00505 }
00506
00507 if (this->busy_count_ == 0 && this->update_posted_ != 0)
00508 {
00509 this->update_posted_ = 0;
00510 this->update_consumer_i (this->c_qos_);
00511 }
00512 }
00513 }
00514
00515 void
00516 TAO_EC_Gateway_IIOP::push_to_consumer (
00517 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
00518 const RtecEventComm::EventSet& event)
00519 {
00520 try
00521 {
00522 consumer->push (event);
00523 }
00524 catch (const CORBA::OBJECT_NOT_EXIST&)
00525 {
00526 ec_control_->event_channel_not_exist (this);
00527 }
00528 catch (CORBA::SystemException& sysex)
00529 {
00530 ec_control_->system_exception (this,
00531 sysex);
00532 }
00533 catch (const CORBA::Exception&)
00534 {
00535
00536 }
00537 }
00538
00539 int
00540 TAO_EC_Gateway_IIOP::shutdown (void)
00541 {
00542 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00543
00544 ec_control_->shutdown();
00545
00546 this->close_i ();
00547
00548 if (this->supplier_is_active_)
00549 {
00550 PortableServer::POA_var poa =
00551 this->supplier_._default_POA ();
00552 PortableServer::ObjectId_var id =
00553 poa->servant_to_id (&this->supplier_);
00554 poa->deactivate_object (id.in ());
00555 this->supplier_is_active_ = false;
00556 }
00557
00558 if (this->consumer_is_active_)
00559 {
00560 PortableServer::POA_var poa =
00561 this->consumer_._default_POA ();
00562 PortableServer::ObjectId_var id =
00563 poa->servant_to_id (&this->consumer_);
00564 poa->deactivate_object (id.in ());
00565 this->consumer_is_active_ = false;
00566 }
00567
00568 this->cleanup_consumer_ec_i ();
00569 this->cleanup_supplier_ec_i ();
00570
00571 return 0;
00572 }
00573
00574 int
00575 TAO_EC_Gateway_IIOP::cleanup_consumer_ec (void)
00576 {
00577 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00578
00579 this->cleanup_consumer_ec_i ();
00580
00581 return 0;
00582 }
00583
00584 int
00585 TAO_EC_Gateway_IIOP::cleanup_supplier_ec (void)
00586 {
00587 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00588
00589 this->cleanup_supplier_ec_i ();
00590
00591 return 0;
00592 }
00593
00594 void
00595 TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i (void)
00596 {
00597 this->consumer_ec_ =
00598 RtecEventChannelAdmin::EventChannel::_nil ();
00599 }
00600
00601 void
00602 TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i (void)
00603 {
00604 this->supplier_ec_ =
00605 RtecEventChannelAdmin::EventChannel::_nil ();
00606 }
00607
00608 CORBA::Boolean
00609 TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i (void) const
00610 {
00611 return !CORBA::is_nil (this->consumer_ec_.in ());
00612 }
00613
00614 CORBA::Boolean
00615 TAO_EC_Gateway_IIOP::consumer_ec_non_existent (
00616 CORBA::Boolean_out disconnected)
00617 {
00618 CORBA::Object_var consumer_ec;
00619 {
00620 ACE_GUARD_THROW_EX (
00621 TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00622 CORBA::INTERNAL ());
00623
00624 disconnected = 0;
00625 if (this->is_consumer_ec_connected_i () == 0)
00626 {
00627 disconnected = 1;
00628 return 0;
00629 }
00630
00631 consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
00632 }
00633
00634 #if (TAO_HAS_MINIMUM_CORBA == 0)
00635 return consumer_ec->_non_existent ();
00636 #else
00637 return 0;
00638 #endif
00639 }
00640
00641 void
00642 TAO_EC_Gateway_IIOP::suspend_supplier_ec (void)
00643 {
00644 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
00645 {
00646 this->supplier_proxy_->suspend_connection ();
00647
00648 supplier_ec_suspended_ = 1;
00649 }
00650 }
00651
00652 void
00653 TAO_EC_Gateway_IIOP::resume_supplier_ec (void)
00654 {
00655 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
00656 {
00657 this->supplier_proxy_->resume_connection ();
00658
00659 supplier_ec_suspended_ = 0;
00660 }
00661 }
00662
00663 TAO_END_VERSIONED_NAMESPACE_DECL