00001
00002
00003 #include "orbsvcs/Notify/EventChannel.h"
00004
00005 #include "orbsvcs/Notify/Container_T.h"
00006 #include "orbsvcs/Notify/EventChannelFactory.h"
00007 #include "orbsvcs/Notify/ConsumerAdmin.h"
00008 #include "orbsvcs/Notify/SupplierAdmin.h"
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Find_Worker_T.h"
00013 #include "orbsvcs/Notify/Seq_Worker_T.h"
00014 #include "orbsvcs/Notify/Topology_Saver.h"
00015 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00016 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00017 #include "orbsvcs/Notify/Proxy.h"
00018 #include "orbsvcs/Notify/Event_Manager.h"
00019 #include "orbsvcs/Notify/POA_Helper.h"
00020
00021 #include "tao/debug.h"
00022
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026
00027 ACE_RCSID(Notify, TAO_Notify_EventChannel, "$Id: EventChannel.cpp 80974 2008-03-17 13:40:44Z johnnyw $")
00028
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 typedef TAO_Notify_Find_Worker_T<TAO_Notify_ConsumerAdmin
00032 , CosNotifyChannelAdmin::ConsumerAdmin
00033 , CosNotifyChannelAdmin::ConsumerAdmin_ptr
00034 , CosNotifyChannelAdmin::AdminNotFound>
00035 TAO_Notify_ConsumerAdmin_Find_Worker;
00036
00037 typedef TAO_Notify_Find_Worker_T<TAO_Notify_SupplierAdmin
00038 , CosNotifyChannelAdmin::SupplierAdmin
00039 , CosNotifyChannelAdmin::SupplierAdmin_ptr
00040 , CosNotifyChannelAdmin::AdminNotFound>
00041 TAO_Notify_SupplierAdmin_Find_Worker;
00042
00043 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_ConsumerAdmin> TAO_Notify_ConsumerAdmin_Seq_Worker;
00044 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_SupplierAdmin> TAO_Notify_SupplierAdmin_Seq_Worker;
00045
00046 TAO_Notify_EventChannel::TAO_Notify_EventChannel (void)
00047 : ecf_ (0)
00048 , ca_container_ (0)
00049 , sa_container_ (0)
00050 , default_filter_factory_ (CosNotifyFilter::FilterFactory::_nil ())
00051 {
00052 }
00053
00054 TAO_Notify_EventChannel::~TAO_Notify_EventChannel ()
00055 {
00056 }
00057
00058 void
00059 TAO_Notify_EventChannel::init (TAO_Notify_EventChannelFactory* ecf
00060 , const CosNotification::QoSProperties & initial_qos
00061 , const CosNotification::AdminProperties & initial_admin)
00062 {
00063 ACE_ASSERT (this->ca_container_.get() == 0);
00064
00065
00066 initialize (ecf);
00067
00068 this->ecf_.reset (ecf);
00069
00070
00071 TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00072 ACE_NEW_THROW_EX (ca_container,
00073 TAO_Notify_ConsumerAdmin_Container (),
00074 CORBA::INTERNAL ());
00075 this->ca_container_.reset (ca_container);
00076
00077 this->ca_container().init ();
00078
00079
00080 TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00081 ACE_NEW_THROW_EX (sa_container,
00082 TAO_Notify_SupplierAdmin_Container (),
00083 CORBA::INTERNAL ());
00084 this->sa_container_.reset (sa_container);
00085
00086 this->sa_container().init ();
00087
00088
00089 TAO_Notify_AdminProperties* admin_properties = 0;
00090 ACE_NEW_THROW_EX (admin_properties,
00091 TAO_Notify_AdminProperties (),
00092 CORBA::NO_MEMORY ());
00093 this->set_admin_properties (admin_properties);
00094
00095
00096 TAO_Notify_Event_Manager* event_manager = 0;
00097 ACE_NEW_THROW_EX (event_manager,
00098 TAO_Notify_Event_Manager (),
00099 CORBA::INTERNAL ());
00100 this->set_event_manager (event_manager);
00101
00102 this->event_manager().init ();
00103
00104 const CosNotification::QoSProperties &default_ec_qos =
00105 TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00106
00107 this->set_qos (default_ec_qos);
00108
00109 this->set_qos (initial_qos);
00110
00111 this->set_admin (initial_admin);
00112
00113 PortableServer::POA_var default_poa = TAO_Notify_PROPERTIES::instance ()->default_poa ();
00114 this->default_filter_factory_ =
00115 TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (default_poa.in());
00116
00117
00118
00119
00120
00121 }
00122
00123
00124 void
00125 TAO_Notify_EventChannel::init (TAO_Notify::Topology_Parent* parent)
00126 {
00127 ACE_ASSERT (this->ecf_.get() == 0);
00128
00129 initialize (parent);
00130
00131 this->ecf_.reset (dynamic_cast <TAO_Notify_EventChannelFactory*>(parent));
00132 ACE_ASSERT (this->ecf_.get() !=0);
00133
00134
00135 TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00136 ACE_NEW_THROW_EX (ca_container,
00137 TAO_Notify_ConsumerAdmin_Container (),
00138 CORBA::INTERNAL ());
00139 this->ca_container_.reset (ca_container);
00140
00141 this->ca_container().init ();
00142
00143 TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00144
00145 ACE_NEW_THROW_EX (sa_container,
00146 TAO_Notify_SupplierAdmin_Container (),
00147 CORBA::INTERNAL ());
00148 this->sa_container_.reset (sa_container);
00149
00150 this->sa_container().init ();
00151
00152
00153 TAO_Notify_AdminProperties* admin_properties = 0;
00154 ACE_NEW_THROW_EX (admin_properties,
00155 TAO_Notify_AdminProperties (),
00156 CORBA::NO_MEMORY ());
00157 this->set_admin_properties (admin_properties);
00158
00159
00160 TAO_Notify_Event_Manager* event_manager = 0;
00161 ACE_NEW_THROW_EX (event_manager,
00162 TAO_Notify_Event_Manager (),
00163 CORBA::INTERNAL ());
00164 this->set_event_manager (event_manager);
00165
00166 this->event_manager().init ();
00167
00168 const CosNotification::QoSProperties &default_ec_qos =
00169 TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00170
00171 this->set_qos (default_ec_qos);
00172
00173 PortableServer::POA_var default_poa = TAO_Notify_PROPERTIES::instance ()->default_poa ();
00174 this->default_filter_factory_ =
00175 TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (default_poa.in ());
00176 }
00177
00178
00179 void
00180 TAO_Notify_EventChannel::_add_ref (void)
00181 {
00182 this->_incr_refcnt ();
00183 }
00184
00185 void
00186 TAO_Notify_EventChannel::_remove_ref (void)
00187 {
00188 this->_decr_refcnt ();
00189 }
00190
00191 void
00192 TAO_Notify_EventChannel::release (void)
00193 {
00194 delete this;
00195
00196 }
00197
00198 void
00199 TAO_Notify_EventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID, bool)
00200 { }
00201
00202 int
00203 TAO_Notify_EventChannel::shutdown (void)
00204 {
00205 int sd_ret = TAO_Notify_Object::shutdown ();
00206
00207 if (sd_ret == 1)
00208 return 1;
00209
00210 this->ca_container().shutdown ();
00211
00212 this->sa_container().shutdown ();
00213
00214 this->event_manager().shutdown ();
00215
00216 return 0;
00217 }
00218
00219 void
00220 TAO_Notify_EventChannel::destroy (void)
00221 {
00222 TAO_Notify_EventChannel::Ptr guard( this );
00223
00224 if (this->shutdown () == 1)
00225 return;
00226
00227 this->ecf_->remove (this);
00228
00229 this->sa_container ().destroy ();
00230 this->ca_container ().destroy ();
00231
00232 this->sa_container_.reset( 0 );
00233 this->ca_container_.reset( 0 );
00234
00235 this->default_filter_factory_ = CosNotifyFilter::FilterFactory::_nil();
00236 }
00237
00238 void
00239 TAO_Notify_EventChannel::remove (TAO_Notify_ConsumerAdmin* consumer_admin)
00240 {
00241 this->ca_container().remove (consumer_admin);
00242 }
00243
00244 void
00245 TAO_Notify_EventChannel::remove (TAO_Notify_SupplierAdmin* supplier_admin)
00246 {
00247 this->sa_container().remove (supplier_admin);
00248 }
00249
00250 void
00251 TAO_Notify_EventChannel::set_qos (const CosNotification::QoSProperties & qos)
00252 {
00253 this->TAO_Notify_Object::set_qos (qos);
00254 }
00255
00256 CosNotification::QoSProperties*
00257 TAO_Notify_EventChannel::get_qos (void)
00258 {
00259 return this->TAO_Notify_Object::get_qos ();
00260 }
00261
00262 CosNotifyChannelAdmin::EventChannelFactory_ptr
00263 TAO_Notify_EventChannel::MyFactory (void)
00264 {
00265 return this->ecf_->_this ();
00266 }
00267
00268 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00269 TAO_Notify_EventChannel::default_consumer_admin (void)
00270 {
00271 if (CORBA::is_nil (default_consumer_admin_.in ()))
00272 {
00273 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::ConsumerAdmin::_nil());
00274 if (CORBA::is_nil (default_consumer_admin_.in ()))
00275 {
00276 CosNotifyChannelAdmin::AdminID id;
00277 this->default_consumer_admin_ = this->new_for_consumers (CosNotifyChannelAdmin::OR_OP, id);
00278
00279 PortableServer::ServantBase * admin_servant =
00280 this->poa()->reference_to_servant (
00281 this->default_consumer_admin_.in ());
00282 TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00283 ACE_ASSERT (pAdmin != 0);
00284 if (pAdmin != 0)
00285 {
00286 pAdmin->set_default (true);
00287 }
00288 }
00289 }
00290 return CosNotifyChannelAdmin::ConsumerAdmin::_duplicate (this->default_consumer_admin_.in ());
00291 }
00292
00293 CosNotifyChannelAdmin::SupplierAdmin_ptr
00294 TAO_Notify_EventChannel::default_supplier_admin (void)
00295 {
00296 if (CORBA::is_nil (default_supplier_admin_.in ()))
00297 {
00298 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::SupplierAdmin::_nil());
00299 if (CORBA::is_nil (default_supplier_admin_.in ()))
00300 {
00301 CosNotifyChannelAdmin::AdminID id;
00302 this->default_supplier_admin_ = this->new_for_suppliers (CosNotifyChannelAdmin::OR_OP, id);
00303 PortableServer::ServantBase * admin_servant =
00304 this->poa()->poa()->reference_to_servant (
00305 this->default_supplier_admin_.in ());
00306 TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00307 ACE_ASSERT (pAdmin != 0);
00308 if (pAdmin != 0)
00309 {
00310 pAdmin->set_default (true);
00311 }
00312 }
00313 }
00314 return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (this->default_supplier_admin_.in ());
00315 }
00316
00317 ::CosNotifyFilter::FilterFactory_ptr
00318 TAO_Notify_EventChannel::default_filter_factory (void)
00319 {
00320 return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00321 }
00322
00323 ::CosNotifyChannelAdmin::ConsumerAdmin_ptr
00324 TAO_Notify_EventChannel::new_for_consumers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00325 CosNotifyChannelAdmin::AdminID_out id)
00326 {
00327 ::CosNotifyChannelAdmin::ConsumerAdmin_var ca =
00328 TAO_Notify_PROPERTIES::instance()->builder()->build_consumer_admin (this, op, id);
00329 this->self_change ();
00330 return ca._retn ();
00331 }
00332
00333 ::CosNotifyChannelAdmin::SupplierAdmin_ptr
00334 TAO_Notify_EventChannel::new_for_suppliers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00335 CosNotifyChannelAdmin::AdminID_out id)
00336 {
00337 ::CosNotifyChannelAdmin::SupplierAdmin_var sa =
00338 TAO_Notify_PROPERTIES::instance()->builder()->build_supplier_admin (this, op, id);
00339 this->self_change ();
00340 return sa._retn ();
00341 }
00342
00343 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00344 TAO_Notify_EventChannel::get_consumeradmin (CosNotifyChannelAdmin::AdminID id)
00345 {
00346 TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00347
00348 return find_worker.resolve (id, this->ca_container());
00349 }
00350
00351 CosNotifyChannelAdmin::SupplierAdmin_ptr
00352 TAO_Notify_EventChannel::get_supplieradmin (CosNotifyChannelAdmin::AdminID id)
00353 {
00354 TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00355
00356 return find_worker.resolve (id, this->sa_container());
00357 }
00358
00359 CosNotifyChannelAdmin::AdminIDSeq*
00360 TAO_Notify_EventChannel::get_all_consumeradmins (void)
00361 {
00362 TAO_Notify_ConsumerAdmin_Seq_Worker seq_worker;
00363
00364 return seq_worker.create (this->ca_container());
00365 }
00366
00367 CosNotifyChannelAdmin::AdminIDSeq*
00368 TAO_Notify_EventChannel::get_all_supplieradmins (void)
00369 {
00370 TAO_Notify_SupplierAdmin_Seq_Worker seq_worker;
00371
00372 return seq_worker.create (this->sa_container());
00373 }
00374
00375 void
00376 TAO_Notify_EventChannel::set_admin (const CosNotification::AdminProperties & admin)
00377 {
00378 this->admin_properties().init (admin);
00379 }
00380
00381 CosNotification::AdminProperties*
00382 TAO_Notify_EventChannel::get_admin (void)
00383 {
00384 CosNotification::AdminProperties_var properties;
00385
00386 ACE_NEW_THROW_EX (properties,
00387 CosNotification::AdminProperties (),
00388 CORBA::NO_MEMORY ());
00389
00390 this->admin_properties().populate (properties);
00391
00392 return properties._retn ();
00393 }
00394
00395 CosEventChannelAdmin::ConsumerAdmin_ptr
00396 TAO_Notify_EventChannel::for_consumers (void)
00397 {
00398 return this->default_consumer_admin();
00399 }
00400
00401 CosEventChannelAdmin::SupplierAdmin_ptr
00402 TAO_Notify_EventChannel::for_suppliers (void)
00403 {
00404 return this->default_supplier_admin ();
00405 }
00406
00407 void
00408 TAO_Notify_EventChannel::validate_qos (const CosNotification::QoSProperties & ,
00409 CosNotification::NamedPropertyRangeSeq_out )
00410 {
00411 throw CORBA::NO_IMPLEMENT ();
00412 }
00413
00414 void
00415 TAO_Notify_EventChannel::save_persistent (TAO_Notify::Topology_Saver& saver)
00416 {
00417 bool changed = this->self_changed_;
00418 this->self_changed_ = false;
00419 this->children_changed_ = false;
00420
00421 if (is_persistent ())
00422 {
00423 TAO_Notify::NVPList attrs;
00424 this->save_attrs(attrs);
00425
00426 bool want_all_children = saver.begin_object(
00427 this->id(), "channel", attrs, changed);
00428
00429 TAO_Notify::Save_Persist_Worker<TAO_Notify_ConsumerAdmin> ca_wrk(saver, want_all_children);
00430
00431 this->ca_container().collection()->for_each(&ca_wrk);
00432
00433 TAO_Notify::Save_Persist_Worker<TAO_Notify_SupplierAdmin> sa_wrk(saver, want_all_children);
00434 this->sa_container().collection()->for_each(&sa_wrk);
00435
00436 saver.end_object(this->id(), "channel");
00437 }
00438 }
00439
00440 namespace {
00441 template<class T>
00442 void add_attr(TAO_Notify::NVPList& attrs, const T& prop) {
00443 if (prop.is_valid())
00444 {
00445 attrs.push_back(TAO_Notify::NVP (prop));
00446 }
00447 }
00448 }
00449
00450 void
00451 TAO_Notify_EventChannel::save_attrs(TAO_Notify::NVPList& attrs)
00452 {
00453 TAO_Notify_Object::save_attrs(attrs);
00454 add_attr(attrs, this->admin_properties().max_global_queue_length());
00455 add_attr(attrs, this->admin_properties().max_consumers());
00456 add_attr(attrs, this->admin_properties().max_suppliers());
00457 add_attr(attrs, this->admin_properties().reject_new_events());
00458 }
00459
00460 void
00461 TAO_Notify_EventChannel::load_attrs(const TAO_Notify::NVPList& attrs)
00462 {
00463 TAO_Notify_Object::load_attrs(attrs);
00464 attrs.load(this->admin_properties().max_global_queue_length());
00465 attrs.load(this->admin_properties().max_consumers());
00466 attrs.load(this->admin_properties().max_suppliers());
00467 attrs.load(this->admin_properties().reject_new_events());
00468 this->admin_properties().init();
00469 }
00470
00471 TAO_Notify::Topology_Object *
00472 TAO_Notify_EventChannel::load_child (const ACE_CString &type,
00473 CORBA::Long id,
00474 const TAO_Notify::NVPList& attrs)
00475 {
00476 TAO_Notify::Topology_Object* result = this;
00477 if (type == "consumer_admin")
00478 {
00479 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00480 ACE_TEXT ("(%P|%t) EventChannel reload consumer_admin %d\n")
00481 , static_cast<int> (id)
00482 ));
00483
00484
00485 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00486 TAO_Notify_ConsumerAdmin * ca = bld->build_consumer_admin (
00487 this,
00488 id);
00489 ca->load_attrs (attrs);
00490 if (ca->is_default ())
00491 {
00492 CORBA::Object_var caob = this->poa()->servant_to_reference (ca);
00493 this->default_consumer_admin_ =
00494 CosNotifyChannelAdmin::ConsumerAdmin::_narrow (
00495 caob.in ());
00496 }
00497 result = ca;
00498 }
00499 else if (type == "supplier_admin")
00500 {
00501 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00502 ACE_TEXT ("(%P|%t) EventChannel reload supplier_admin %d\n")
00503 , static_cast<int> (id)
00504 ));
00505 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00506
00507 TAO_Notify_SupplierAdmin * sa = bld->build_supplier_admin (
00508 this,
00509 id);
00510 sa->load_attrs (attrs);
00511 if (sa->is_default ())
00512 {
00513 CORBA::Object_var saob = this->poa()->servant_to_reference (sa);
00514 this->default_supplier_admin_ =
00515 CosNotifyChannelAdmin::SupplierAdmin::_narrow (
00516 saob.in ());
00517 }
00518 result = sa;
00519 }
00520 return result;
00521 }
00522 TAO_Notify_ProxyConsumer *
00523 TAO_Notify_EventChannel::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00524 {
00525 TAO_Notify_ProxyConsumer * result = 0;
00526 size_t path_size = id_path.size ();
00527 if (position < path_size)
00528 {
00529 TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00530 TAO_Notify_SupplierAdmin * admin = find_worker.find (id_path[position], this->sa_container());
00531 ++position;
00532 if (admin != 0)
00533 {
00534 result = admin->find_proxy_consumer (id_path, position);
00535 }
00536 }
00537 return result;
00538 }
00539
00540 TAO_Notify_ProxySupplier *
00541 TAO_Notify_EventChannel::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00542 {
00543 TAO_Notify_ProxySupplier * result = 0;
00544 size_t path_size = id_path.size ();
00545 if (position < path_size)
00546 {
00547 TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00548 TAO_Notify_ConsumerAdmin * admin = find_worker.find (id_path[position], this->ca_container());
00549 ++position;
00550 if (admin != 0)
00551 {
00552 result = admin->find_proxy_supplier (id_path, position);
00553 }
00554 }
00555 return result;
00556 }
00557
00558
00559 void
00560 TAO_Notify_EventChannel::reconnect (void)
00561 {
00562 TAO_Notify::Reconnect_Worker<TAO_Notify_ConsumerAdmin> ca_wrk;
00563 this->ca_container().collection()->for_each(&ca_wrk);
00564
00565 TAO_Notify::Reconnect_Worker<TAO_Notify_SupplierAdmin> sa_wrk;
00566 this->sa_container().collection()->for_each(&sa_wrk);
00567 }
00568
00569 TAO_Notify_EventChannel::TAO_Notify_ConsumerAdmin_Container&
00570 TAO_Notify_EventChannel::ca_container()
00571 {
00572 ACE_ASSERT( this->ca_container_.get() != 0 );
00573 return *ca_container_;
00574 }
00575
00576 TAO_Notify_EventChannel::TAO_Notify_SupplierAdmin_Container&
00577 TAO_Notify_EventChannel::sa_container()
00578 {
00579 ACE_ASSERT( this->sa_container_.get() != 0 );
00580 return *sa_container_;
00581 }
00582
00583 TAO_END_VERSIONED_NAMESPACE_DECL