00001
00002
00003 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
00004 #include "orbsvcs/Event/ECG_Defaults.h"
00005 #include "orbsvcs/Event_Utilities.h"
00006 #include "orbsvcs/Time_Utilities.h"
00007
00008 #include "orbsvcs/Event/EC_Gateway_IIOP_Factory.h"
00009 #include "orbsvcs/Event/ECG_ConsumerEC_Control.h"
00010
00011 #include "ace/Dynamic_Service.h"
00012
00013 ACE_RCSID (Event,
00014 EC_Gateway_IIOP,
00015 "$Id: EC_Gateway_IIOP.cpp 77001 2007-02-12 07:54:49Z johnnyw $")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
00020 : busy_count_ (0),
00021 update_posted_ (0),
00022 cleanup_posted_ (0),
00023 supplier_ec_suspended_ (0),
00024 supplier_info_ (0),
00025 consumer_info_ (0),
00026 consumer_ (this),
00027 consumer_is_active_ (0),
00028 supplier_ (this),
00029 supplier_is_active_ (0),
00030 ec_control_ (0),
00031 factory_ (0),
00032 use_ttl_ (1),
00033 use_consumer_proxy_map_ (1)
00034 {
00035 if (this->factory_ == 0)
00036 {
00037 this->factory_ =
00038 ACE_Dynamic_Service<TAO_EC_Gateway_IIOP_Factory>::instance ("EC_Gateway_IIOP_Factory");
00039
00040 if (this->factory_ == 0)
00041 {
00042 TAO_EC_Gateway_IIOP_Factory *f = 0;
00043 ACE_NEW (f,
00044 TAO_EC_Gateway_IIOP_Factory);
00045 this->factory_ = f;
00046 }
00047 }
00048
00049 if (this->factory_ != 0)
00050 {
00051 this->use_ttl_ = this->factory_->use_ttl();
00052 this->use_consumer_proxy_map_ = this->factory_->use_consumer_proxy_map();
00053 }
00054 }
00055
00056 TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
00057 {
00058 delete ec_control_;
00059 ec_control_ = 0;
00060 }
00061
00062 int
00063 TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00064 RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
00065 {
00066 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00067
00068 return this->init_i (supplier_ec, consumer_ec);
00069 }
00070
00071 int
00072 TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr supplier_ec,
00073 RtecEventChannelAdmin::EventChannel_ptr consumer_ec)
00074 {
00075 if (CORBA::is_nil (this->supplier_ec_.in ()) && CORBA::is_nil (this->consumer_ec_.in ()))
00076 {
00077 this->supplier_ec_ =
00078 RtecEventChannelAdmin::EventChannel::_duplicate (supplier_ec);
00079 this->consumer_ec_ =
00080 RtecEventChannelAdmin::EventChannel::_duplicate (consumer_ec);
00081
00082 if (ec_control_ == 0)
00083 {
00084 ec_control_ = factory_->create_consumerec_control(this);
00085 ec_control_->activate();
00086 }
00087
00088 return 0;
00089 }
00090 else
00091 ACE_ERROR_RETURN ((LM_ERROR,
00092 "TAO_EC_Gateway_IIOP - init_i "
00093 "Supplier and consumer event channel reference "
00094 "should be nil.\n"), -1);
00095 }
00096
00097 void
00098 TAO_EC_Gateway_IIOP::close (void)
00099 {
00100 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00101
00102 this->close_i ();
00103 }
00104
00105 void
00106 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies (void)
00107 {
00108 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00109
00110
00111 if (this->busy_count_ != 0)
00112 {
00113 this->cleanup_posted_ = 1;
00114 return;
00115 }
00116
00117 this->cleanup_consumer_proxies_i ();
00118 }
00119
00120 void
00121 TAO_EC_Gateway_IIOP::close_i (void)
00122 {
00123
00124 this->disconnect_consumer_proxies_i ();
00125
00126 this->disconnect_supplier_proxy_i ();
00127 }
00128
00129 void
00130 TAO_EC_Gateway_IIOP::disconnect_consumer_proxies_i (void)
00131 {
00132 if (this->consumer_proxy_map_.current_size () > 0)
00133 {
00134 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00135 j != this->consumer_proxy_map_.end ();
00136 ++j)
00137 {
00138 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00139 if (CORBA::is_nil (consumer))
00140 continue;
00141 try
00142 {
00143 consumer->disconnect_push_consumer ();
00144 }
00145 catch (const CORBA::Exception&)
00146 {
00147 }
00148 CORBA::release (consumer);
00149 }
00150
00151
00152 this->consumer_proxy_map_.open ();
00153 }
00154
00155 if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00156 {
00157 this->default_consumer_proxy_->disconnect_push_consumer ();
00158
00159 this->default_consumer_proxy_ =
00160 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00161 }
00162 }
00163
00164 void
00165 TAO_EC_Gateway_IIOP::disconnect_supplier_proxy_i (void)
00166 {
00167 if (!CORBA::is_nil (this->supplier_proxy_.in ()))
00168 {
00169 this->supplier_proxy_->disconnect_push_supplier ();
00170
00171 this->supplier_proxy_ =
00172 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
00173 }
00174 }
00175
00176 void
00177 TAO_EC_Gateway_IIOP::reconnect_consumer_ec(void)
00178 {
00179 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00180
00181 if (this->busy_count_ != 0)
00182 {
00183 this->update_posted_ = 1;
00184 return;
00185 }
00186
00187 this->update_consumer_i (c_qos_);
00188 }
00189
00190 void
00191 TAO_EC_Gateway_IIOP::update_consumer (
00192 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00193 {
00194 if (c_qos.dependencies.length () == 0)
00195 return;
00196
00197 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00198
00199 this->c_qos_ = c_qos;
00200
00201 if (this->busy_count_ != 0)
00202 {
00203 this->update_posted_ = 1;
00204 return;
00205 }
00206
00207 this->update_consumer_i (c_qos);
00208 }
00209
00210 void
00211 TAO_EC_Gateway_IIOP::cleanup_consumer_proxies_i (void)
00212 {
00213 if (this->consumer_proxy_map_.current_size () > 0)
00214 {
00215 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00216 j != this->consumer_proxy_map_.end ();
00217 ++j)
00218 {
00219 RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
00220 if (CORBA::is_nil (consumer))
00221 continue;
00222
00223 CORBA::release (consumer);
00224 }
00225
00226
00227 this->consumer_proxy_map_.open ();
00228 }
00229
00230 if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
00231 {
00232 this->default_consumer_proxy_ =
00233 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
00234 }
00235 }
00236
00237 void
00238 TAO_EC_Gateway_IIOP::update_consumer_i (
00239 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00240 {
00241 this->close_i ();
00242
00243 if (CORBA::is_nil (this->consumer_ec_.in ())
00244 || CORBA::is_nil (this->supplier_ec_.in ()))
00245 return;
00246
00247
00248
00249 this->open_i (c_qos);
00250 }
00251
00252 void
00253 TAO_EC_Gateway_IIOP::open_i (
00254 const RtecEventChannelAdmin::ConsumerQOS& c_qos)
00255 {
00256
00257 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
00258 this->consumer_ec_->for_suppliers ();
00259
00260 RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
00261 sub.is_gateway = 1;
00262
00263
00264
00265
00266 for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
00267 {
00268 sub.dependencies[i].rt_info = this->supplier_info_;
00269
00270 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00271 const RtecEventComm::EventHeader &h =
00272 sub.dependencies[i].event.header;
00273
00274 RtecEventComm::EventSourceID sid = h.source;
00275
00276
00277
00278
00279
00280
00281
00282
00283 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0)
00284 continue;
00285
00286
00287 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00288 continue;
00289
00290 if (this->consumer_proxy_map_.find (sid, proxy) == -1)
00291 {
00292
00293
00294
00295 proxy = supplier_admin->obtain_push_consumer ();
00296 this->consumer_proxy_map_.bind (sid, proxy);
00297 }
00298 }
00299
00300
00301
00302
00303 if (this->consumer_proxy_map_.current_size () > 0)
00304 {
00305 this->supplier_is_active_ = 1;
00306
00307
00308 RtecEventComm::PushSupplier_var supplier_ref =
00309 this->supplier_._this ();
00310
00311
00312
00313
00314 for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
00315 j != this->consumer_proxy_map_.end ();
00316 ++j)
00317 {
00318 RtecEventChannelAdmin::SupplierQOS pub;
00319 pub.publications.length (sub.dependencies.length () + 1);
00320 pub.is_gateway = 1;
00321
00322 int c = 0;
00323
00324 RtecEventComm::EventSourceID sid = (*j).ext_id_;
00325 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00326 {
00327 const RtecEventComm::EventHeader& h =
00328 sub.dependencies[k].event.header;
00329 if (h.source != sid
00330 || (ACE_ES_EVENT_ANY < h.type
00331 && h.type < ACE_ES_EVENT_UNDEFINED))
00332 continue;
00333 pub.publications[c].event.header = h;
00334 pub.publications[c].dependency_info.dependency_type =
00335 RtecBase::TWO_WAY_CALL;
00336 pub.publications[c].dependency_info.number_of_calls = 1;
00337 pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00338 c++;
00339 }
00340
00341
00342
00343 if (c == 0)
00344 continue;
00345
00346 pub.publications.length (c);
00347
00348
00349
00350 (*j).int_id_->connect_push_supplier (supplier_ref.in (),
00351 pub);
00352 }
00353 }
00354
00355
00356
00357
00358 RtecEventChannelAdmin::SupplierQOS pub;
00359 pub.publications.length (sub.dependencies.length () + 1);
00360 pub.is_gateway = 1;
00361 int c = 0;
00362 for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
00363 {
00364 const RtecEventComm::EventHeader& h =
00365 sub.dependencies[k].event.header;
00366 RtecEventComm::EventSourceID sid = h.source;
00367
00368
00369 if (sid != ACE_ES_EVENT_SOURCE_ANY && this->use_consumer_proxy_map_ == 1)
00370 continue;
00371
00372
00373 if (ACE_ES_EVENT_ANY < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
00374 continue;
00375
00376 pub.publications[c].event.header = h;
00377 pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
00378 pub.publications[c].dependency_info.dependency_type =
00379 RtecBase::TWO_WAY_CALL;
00380 pub.publications[c].dependency_info.number_of_calls = 1;
00381 pub.publications[c].dependency_info.rt_info = this->consumer_info_;
00382 c++;
00383 }
00384
00385 if (c > 0)
00386 {
00387 this->supplier_is_active_ = 1;
00388
00389
00390 RtecEventComm::PushSupplier_var supplier_ref =
00391 this->supplier_._this ();
00392
00393
00394 this->default_consumer_proxy_ =
00395 supplier_admin->obtain_push_consumer ();
00396
00397 pub.publications.length (c);
00398
00399
00400 this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
00401 pub);
00402 }
00403
00404 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00405 this->supplier_ec_->for_consumers ();
00406
00407 this->supplier_proxy_ =
00408 consumer_admin->obtain_push_supplier ();
00409
00410 this->consumer_is_active_ = 1;
00411 RtecEventComm::PushConsumer_var consumer_ref =
00412 this->consumer_._this ();
00413
00414
00415
00416
00417 this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
00418 sub);
00419 }
00420
00421 void
00422 TAO_EC_Gateway_IIOP::update_supplier (
00423 const RtecEventChannelAdmin::SupplierQOS&)
00424 {
00425
00426 }
00427
00428 void
00429 TAO_EC_Gateway_IIOP::disconnect_push_consumer (void)
00430 {
00431
00432
00433
00434 }
00435
00436 void
00437 TAO_EC_Gateway_IIOP::disconnect_push_supplier (void)
00438 {
00439
00440
00441
00442 }
00443
00444 void
00445 TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events)
00446 {
00447
00448
00449 if (events.length () == 0)
00450 {
00451
00452 return;
00453 }
00454
00455 {
00456 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00457
00458 this->busy_count_++;
00459 }
00460
00461
00462
00463
00464
00465 RtecEventComm::EventSet out (1);
00466 out.length (1);
00467 for (CORBA::ULong i = 0; i < events.length (); ++i)
00468 {
00469 if (this->use_ttl_ == 1)
00470 {
00471 if (events[i].header.ttl == 0)
00472 continue;
00473 }
00474
00475 RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
00476 RtecEventComm::EventSourceID sid = events[i].header.source;
00477 if (sid == ACE_ES_EVENT_SOURCE_ANY || this->use_consumer_proxy_map_ == 0
00478 || this->consumer_proxy_map_.find (sid, proxy) == -1)
00479 {
00480
00481
00482 proxy = this->default_consumer_proxy_.in ();
00483 }
00484
00485 if (CORBA::is_nil (proxy))
00486 continue;
00487
00488 out[0] = events[i];
00489
00490 if (this->use_ttl_ == 1)
00491 out[0].header.ttl--;
00492
00493
00494 this->push_to_consumer(proxy, out);
00495 }
00496
00497 {
00498 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00499
00500 this->busy_count_--;
00501
00502 if (this->busy_count_ == 0 && this->cleanup_posted_ != 0)
00503 {
00504 this->cleanup_posted_ = 0;
00505 this->cleanup_consumer_proxies_i ();
00506 }
00507
00508 if (this->busy_count_ == 0 && this->update_posted_ != 0)
00509 {
00510 this->update_posted_ = 0;
00511 this->update_consumer_i (this->c_qos_);
00512 }
00513 }
00514 }
00515
00516 void
00517 TAO_EC_Gateway_IIOP::push_to_consumer (
00518 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
00519 const RtecEventComm::EventSet& event)
00520 {
00521 try
00522 {
00523 consumer->push (event);
00524 }
00525 catch (const CORBA::OBJECT_NOT_EXIST&)
00526 {
00527 ec_control_->event_channel_not_exist (this);
00528 }
00529 catch (CORBA::SystemException& sysex)
00530 {
00531 ec_control_->system_exception (this,
00532 sysex);
00533 }
00534 catch (const CORBA::Exception&)
00535 {
00536
00537 }
00538 }
00539
00540 int
00541 TAO_EC_Gateway_IIOP::shutdown (void)
00542 {
00543 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00544
00545 ec_control_->shutdown();
00546
00547 this->close_i ();
00548
00549 if (this->supplier_is_active_)
00550 {
00551 PortableServer::POA_var poa =
00552 this->supplier_._default_POA ();
00553 PortableServer::ObjectId_var id =
00554 poa->servant_to_id (&this->supplier_);
00555 poa->deactivate_object (id.in ());
00556 this->supplier_is_active_ = 0;
00557 }
00558
00559 if (this->consumer_is_active_)
00560 {
00561 PortableServer::POA_var poa =
00562 this->consumer_._default_POA ();
00563 PortableServer::ObjectId_var id =
00564 poa->servant_to_id (&this->consumer_);
00565 poa->deactivate_object (id.in ());
00566 this->consumer_is_active_ = 0;
00567 }
00568
00569 this->cleanup_consumer_ec_i ();
00570 this->cleanup_supplier_ec_i ();
00571
00572 return 0;
00573 }
00574
00575 int
00576 TAO_EC_Gateway_IIOP::cleanup_consumer_ec (void)
00577 {
00578 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00579
00580 this->cleanup_consumer_ec_i ();
00581
00582 return 0;
00583 }
00584
00585 int
00586 TAO_EC_Gateway_IIOP::cleanup_supplier_ec (void)
00587 {
00588 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
00589
00590 this->cleanup_supplier_ec_i ();
00591
00592 return 0;
00593 }
00594
00595 void
00596 TAO_EC_Gateway_IIOP::cleanup_consumer_ec_i (void)
00597 {
00598 this->consumer_ec_ =
00599 RtecEventChannelAdmin::EventChannel::_nil ();
00600 }
00601
00602 void
00603 TAO_EC_Gateway_IIOP::cleanup_supplier_ec_i (void)
00604 {
00605 this->supplier_ec_ =
00606 RtecEventChannelAdmin::EventChannel::_nil ();
00607 }
00608
00609 CORBA::Boolean
00610 TAO_EC_Gateway_IIOP::is_consumer_ec_connected_i (void) const
00611 {
00612 return !CORBA::is_nil (this->consumer_ec_.in ());
00613 }
00614
00615 CORBA::Boolean
00616 TAO_EC_Gateway_IIOP::consumer_ec_non_existent (
00617 CORBA::Boolean_out disconnected)
00618 {
00619 CORBA::Object_var consumer_ec;
00620 {
00621 ACE_GUARD_THROW_EX (
00622 TAO_SYNCH_MUTEX, ace_mon, this->lock_,
00623 CORBA::INTERNAL ());
00624
00625 disconnected = 0;
00626 if (this->is_consumer_ec_connected_i () == 0)
00627 {
00628 disconnected = 1;
00629 return 0;
00630 }
00631
00632 consumer_ec = CORBA::Object::_duplicate (this->consumer_ec_.in ());
00633 }
00634
00635 #if (TAO_HAS_MINIMUM_CORBA == 0)
00636 return consumer_ec->_non_existent ();
00637 #else
00638 return 0;
00639 #endif
00640 }
00641
00642 void
00643 TAO_EC_Gateway_IIOP::suspend_supplier_ec (void)
00644 {
00645 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 0)
00646 {
00647 this->supplier_proxy_->suspend_connection ();
00648
00649 supplier_ec_suspended_ = 1;
00650 }
00651 }
00652
00653 void
00654 TAO_EC_Gateway_IIOP::resume_supplier_ec (void)
00655 {
00656 if (!CORBA::is_nil (this->supplier_proxy_.in ()) && supplier_ec_suspended_ == 1)
00657 {
00658 this->supplier_proxy_->resume_connection ();
00659
00660 supplier_ec_suspended_ = 0;
00661 }
00662 }
00663
00664 TAO_END_VERSIONED_NAMESPACE_DECL