00001
00002
00003 #include "orbsvcs/Notify/EventChannel.h"
00004
00005 #include "orbsvcs/Notify/Container_T.h"
00006 #include "orbsvcs/Notify/EventChannelFactory.h"
00007 #include "orbsvcs/Notify/ConsumerAdmin.h"
00008 #include "orbsvcs/Notify/SupplierAdmin.h"
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Find_Worker_T.h"
00013 #include "orbsvcs/Notify/Seq_Worker_T.h"
00014 #include "orbsvcs/Notify/Topology_Saver.h"
00015 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00016 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00017 #include "orbsvcs/Notify/Proxy.h"
00018 #include "orbsvcs/Notify/Event_Manager.h"
00019 #include "orbsvcs/Notify/POA_Helper.h"
00020
00021 #include "tao/debug.h"
00022
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026
00027 ACE_RCSID(Notify, TAO_Notify_EventChannel, "$Id: EventChannel.cpp 79084 2007-07-30 13:13:45Z elliott_c $")
00028
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 typedef TAO_Notify_Find_Worker_T<TAO_Notify_ConsumerAdmin
00032 , CosNotifyChannelAdmin::ConsumerAdmin
00033 , CosNotifyChannelAdmin::ConsumerAdmin_ptr
00034 , CosNotifyChannelAdmin::AdminNotFound>
00035 TAO_Notify_ConsumerAdmin_Find_Worker;
00036
00037 typedef TAO_Notify_Find_Worker_T<TAO_Notify_SupplierAdmin
00038 , CosNotifyChannelAdmin::SupplierAdmin
00039 , CosNotifyChannelAdmin::SupplierAdmin_ptr
00040 , CosNotifyChannelAdmin::AdminNotFound>
00041 TAO_Notify_SupplierAdmin_Find_Worker;
00042
00043 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_ConsumerAdmin> TAO_Notify_ConsumerAdmin_Seq_Worker;
00044 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_SupplierAdmin> TAO_Notify_SupplierAdmin_Seq_Worker;
00045
00046 TAO_Notify_EventChannel::TAO_Notify_EventChannel (void)
00047 : ecf_ (0)
00048 , ca_container_ (0)
00049 , sa_container_ (0)
00050 {
00051 }
00052
00053 TAO_Notify_EventChannel::~TAO_Notify_EventChannel ()
00054 {
00055 }
00056
00057 void
00058 TAO_Notify_EventChannel::init (TAO_Notify_EventChannelFactory* ecf
00059 , const CosNotification::QoSProperties & initial_qos
00060 , const CosNotification::AdminProperties & initial_admin)
00061 {
00062 ACE_ASSERT (this->ca_container_.get() == 0);
00063
00064
00065 initialize (ecf);
00066
00067 this->ecf_.reset (ecf);
00068
00069
00070 TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00071 ACE_NEW_THROW_EX (ca_container,
00072 TAO_Notify_ConsumerAdmin_Container (),
00073 CORBA::INTERNAL ());
00074 this->ca_container_.reset (ca_container);
00075
00076 this->ca_container().init ();
00077
00078
00079 TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00080 ACE_NEW_THROW_EX (sa_container,
00081 TAO_Notify_SupplierAdmin_Container (),
00082 CORBA::INTERNAL ());
00083 this->sa_container_.reset (sa_container);
00084
00085 this->sa_container().init ();
00086
00087
00088 TAO_Notify_AdminProperties* admin_properties = 0;
00089 ACE_NEW_THROW_EX (admin_properties,
00090 TAO_Notify_AdminProperties (),
00091 CORBA::NO_MEMORY ());
00092 this->set_admin_properties (admin_properties);
00093
00094
00095 TAO_Notify_Event_Manager* event_manager = 0;
00096 ACE_NEW_THROW_EX (event_manager,
00097 TAO_Notify_Event_Manager (),
00098 CORBA::INTERNAL ());
00099 this->set_event_manager (event_manager);
00100
00101 this->event_manager().init ();
00102
00103 const CosNotification::QoSProperties &default_ec_qos =
00104 TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00105
00106 this->set_qos (default_ec_qos);
00107
00108 this->set_qos (initial_qos);
00109
00110 this->set_admin (initial_admin);
00111
00112
00113
00114
00115
00116 }
00117
00118
00119 void
00120 TAO_Notify_EventChannel::init (TAO_Notify::Topology_Parent* parent)
00121 {
00122 ACE_ASSERT (this->ecf_.get() == 0);
00123
00124 initialize (parent);
00125
00126 this->ecf_.reset (dynamic_cast <TAO_Notify_EventChannelFactory*>(parent));
00127 ACE_ASSERT (this->ecf_.get() !=0);
00128
00129
00130 TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00131 ACE_NEW_THROW_EX (ca_container,
00132 TAO_Notify_ConsumerAdmin_Container (),
00133 CORBA::INTERNAL ());
00134 this->ca_container_.reset (ca_container);
00135
00136 this->ca_container().init ();
00137
00138 TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00139
00140 ACE_NEW_THROW_EX (sa_container,
00141 TAO_Notify_SupplierAdmin_Container (),
00142 CORBA::INTERNAL ());
00143 this->sa_container_.reset (sa_container);
00144
00145 this->sa_container().init ();
00146
00147
00148 TAO_Notify_AdminProperties* admin_properties = 0;
00149 ACE_NEW_THROW_EX (admin_properties,
00150 TAO_Notify_AdminProperties (),
00151 CORBA::NO_MEMORY ());
00152 this->set_admin_properties (admin_properties);
00153
00154
00155 TAO_Notify_Event_Manager* event_manager = 0;
00156 ACE_NEW_THROW_EX (event_manager,
00157 TAO_Notify_Event_Manager (),
00158 CORBA::INTERNAL ());
00159 this->set_event_manager (event_manager);
00160
00161 this->event_manager().init ();
00162
00163 const CosNotification::QoSProperties &default_ec_qos =
00164 TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00165
00166 this->set_qos (default_ec_qos);
00167
00168 }
00169
00170
00171 void
00172 TAO_Notify_EventChannel::_add_ref (void)
00173 {
00174 this->_incr_refcnt ();
00175 }
00176
00177 void
00178 TAO_Notify_EventChannel::_remove_ref (void)
00179 {
00180 this->_decr_refcnt ();
00181 }
00182
00183 void
00184 TAO_Notify_EventChannel::release (void)
00185 {
00186 delete this;
00187
00188 }
00189
00190 int
00191 TAO_Notify_EventChannel::shutdown (void)
00192 {
00193 int sd_ret = TAO_Notify_Object::shutdown ();
00194
00195 if (sd_ret == 1)
00196 return 1;
00197
00198 this->ca_container().shutdown ();
00199
00200 this->sa_container().shutdown ();
00201
00202 this->event_manager().shutdown ();
00203
00204 return 0;
00205 }
00206
00207 void
00208 TAO_Notify_EventChannel::destroy (void)
00209 {
00210 TAO_Notify_EventChannel::Ptr guard( this );
00211
00212 int result = this->shutdown ();
00213 if ( result == 1)
00214 return;
00215
00216 this->ecf_->remove (this);
00217
00218 this->sa_container ().destroy ();
00219 this->ca_container ().destroy ();
00220
00221 this->sa_container_.reset( 0 );
00222 this->ca_container_.reset( 0 );
00223 }
00224
00225 void
00226 TAO_Notify_EventChannel::remove (TAO_Notify_ConsumerAdmin* consumer_admin)
00227 {
00228 this->ca_container().remove (consumer_admin);
00229 }
00230
00231 void
00232 TAO_Notify_EventChannel::remove (TAO_Notify_SupplierAdmin* supplier_admin)
00233 {
00234 this->sa_container().remove (supplier_admin);
00235 }
00236
00237 void
00238 TAO_Notify_EventChannel::set_qos (const CosNotification::QoSProperties & qos)
00239 {
00240 this->TAO_Notify_Object::set_qos (qos);
00241 }
00242
00243 CosNotification::QoSProperties*
00244 TAO_Notify_EventChannel::get_qos (void)
00245 {
00246 return this->TAO_Notify_Object::get_qos ();
00247 }
00248
00249 CosNotifyChannelAdmin::EventChannelFactory_ptr
00250 TAO_Notify_EventChannel::MyFactory (void)
00251 {
00252 return this->ecf_->_this ();
00253 }
00254
00255 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00256 TAO_Notify_EventChannel::default_consumer_admin (void)
00257 {
00258 if (CORBA::is_nil (default_consumer_admin_.in ()))
00259 {
00260 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::ConsumerAdmin::_nil());
00261 if (CORBA::is_nil (default_consumer_admin_.in ()))
00262 {
00263 CosNotifyChannelAdmin::AdminID id;
00264 this->default_consumer_admin_ = this->new_for_consumers (CosNotifyChannelAdmin::OR_OP, id);
00265
00266 PortableServer::ServantBase * admin_servant =
00267 this->poa()->reference_to_servant (
00268 this->default_consumer_admin_.in ());
00269 TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00270 ACE_ASSERT (pAdmin != 0);
00271 if (pAdmin != 0)
00272 {
00273 pAdmin->set_default (true);
00274 }
00275 }
00276 }
00277 return CosNotifyChannelAdmin::ConsumerAdmin::_duplicate (this->default_consumer_admin_.in ());
00278 }
00279
00280 CosNotifyChannelAdmin::SupplierAdmin_ptr
00281 TAO_Notify_EventChannel::default_supplier_admin (void)
00282 {
00283 if (CORBA::is_nil (default_supplier_admin_.in ()))
00284 {
00285 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::SupplierAdmin::_nil());
00286 if (CORBA::is_nil (default_supplier_admin_.in ()))
00287 {
00288 CosNotifyChannelAdmin::AdminID id;
00289 this->default_supplier_admin_ = this->new_for_suppliers (CosNotifyChannelAdmin::OR_OP, id);
00290 PortableServer::ServantBase * admin_servant =
00291 this->poa()->poa()->reference_to_servant (
00292 this->default_supplier_admin_.in ());
00293 TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00294 ACE_ASSERT (pAdmin != 0);
00295 if (pAdmin != 0)
00296 {
00297 pAdmin->set_default (true);
00298 }
00299 }
00300 }
00301 return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (this->default_supplier_admin_.in ());
00302 }
00303
00304 ::CosNotifyFilter::FilterFactory_ptr TAO_Notify_EventChannel::default_filter_factory (void)
00305 {
00306 return this->ecf_->get_default_filter_factory ();
00307 }
00308
00309 ::CosNotifyChannelAdmin::ConsumerAdmin_ptr
00310 TAO_Notify_EventChannel::new_for_consumers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00311 CosNotifyChannelAdmin::AdminID_out id
00312 )
00313
00314 {
00315 ::CosNotifyChannelAdmin::ConsumerAdmin_var ca =
00316 TAO_Notify_PROPERTIES::instance()->builder()->build_consumer_admin (this, op, id);
00317 this->self_change ();
00318 return ca._retn ();
00319 }
00320
00321 ::CosNotifyChannelAdmin::SupplierAdmin_ptr
00322 TAO_Notify_EventChannel::new_for_suppliers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00323 CosNotifyChannelAdmin::AdminID_out id
00324 )
00325 {
00326 ::CosNotifyChannelAdmin::SupplierAdmin_var sa =
00327 TAO_Notify_PROPERTIES::instance()->builder()->build_supplier_admin (this, op, id);
00328 this->self_change ();
00329 return sa._retn ();
00330 }
00331
00332 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00333 TAO_Notify_EventChannel::get_consumeradmin (CosNotifyChannelAdmin::AdminID id)
00334 {
00335 TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00336
00337 return find_worker.resolve (id, this->ca_container());
00338 }
00339
00340 CosNotifyChannelAdmin::SupplierAdmin_ptr
00341 TAO_Notify_EventChannel::get_supplieradmin (CosNotifyChannelAdmin::AdminID id)
00342 {
00343 TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00344
00345 return find_worker.resolve (id, this->sa_container());
00346 }
00347
00348 CosNotifyChannelAdmin::AdminIDSeq*
00349 TAO_Notify_EventChannel::get_all_consumeradmins (void)
00350 {
00351 TAO_Notify_ConsumerAdmin_Seq_Worker seq_worker;
00352
00353 return seq_worker.create (this->ca_container());
00354 }
00355
00356 CosNotifyChannelAdmin::AdminIDSeq*
00357 TAO_Notify_EventChannel::get_all_supplieradmins (void)
00358 {
00359 TAO_Notify_SupplierAdmin_Seq_Worker seq_worker;
00360
00361 return seq_worker.create (this->sa_container());
00362 }
00363
00364 void
00365 TAO_Notify_EventChannel::set_admin (const CosNotification::AdminProperties & admin)
00366 {
00367 this->admin_properties().init (admin);
00368 }
00369
00370 CosNotification::AdminProperties*
00371 TAO_Notify_EventChannel::get_admin (void)
00372 {
00373 CosNotification::AdminProperties_var properties;
00374
00375 ACE_NEW_THROW_EX (properties,
00376 CosNotification::AdminProperties (),
00377 CORBA::NO_MEMORY ());
00378
00379 this->admin_properties().populate (properties);
00380
00381 return properties._retn ();
00382 }
00383
00384 CosEventChannelAdmin::ConsumerAdmin_ptr
00385 TAO_Notify_EventChannel::for_consumers (void)
00386 {
00387 return this->default_consumer_admin();
00388 }
00389
00390 CosEventChannelAdmin::SupplierAdmin_ptr
00391 TAO_Notify_EventChannel::for_suppliers (void)
00392 {
00393 return this->default_supplier_admin ();
00394 }
00395
00396 void
00397 TAO_Notify_EventChannel::validate_qos (const CosNotification::QoSProperties & ,
00398 CosNotification::NamedPropertyRangeSeq_out
00399 )
00400 {
00401 throw CORBA::NO_IMPLEMENT ();
00402 }
00403
00404 void
00405 TAO_Notify_EventChannel::save_persistent (TAO_Notify::Topology_Saver& saver)
00406 {
00407 bool changed = this->self_changed_;
00408 this->self_changed_ = false;
00409 this->children_changed_ = false;
00410
00411 if (is_persistent ())
00412 {
00413 TAO_Notify::NVPList attrs;
00414 this->save_attrs(attrs);
00415
00416 bool want_all_children = saver.begin_object(
00417 this->id(), "channel", attrs, changed);
00418
00419 TAO_Notify::Save_Persist_Worker<TAO_Notify_ConsumerAdmin> ca_wrk(saver, want_all_children);
00420
00421 this->ca_container().collection()->for_each(&ca_wrk);
00422
00423 TAO_Notify::Save_Persist_Worker<TAO_Notify_SupplierAdmin> sa_wrk(saver, want_all_children);
00424 this->sa_container().collection()->for_each(&sa_wrk);
00425
00426 saver.end_object(this->id(), "channel");
00427 }
00428 }
00429
00430 namespace {
00431 template<class T>
00432 void add_attr(TAO_Notify::NVPList& attrs, const T& prop) {
00433 if (prop.is_valid())
00434 {
00435 attrs.push_back(TAO_Notify::NVP (prop));
00436 }
00437 }
00438 }
00439
00440 void
00441 TAO_Notify_EventChannel::save_attrs(TAO_Notify::NVPList& attrs)
00442 {
00443 TAO_Notify_Object::save_attrs(attrs);
00444 add_attr(attrs, this->admin_properties().max_global_queue_length());
00445 add_attr(attrs, this->admin_properties().max_consumers());
00446 add_attr(attrs, this->admin_properties().max_suppliers());
00447 add_attr(attrs, this->admin_properties().reject_new_events());
00448 }
00449
00450 void
00451 TAO_Notify_EventChannel::load_attrs(const TAO_Notify::NVPList& attrs)
00452 {
00453 TAO_Notify_Object::load_attrs(attrs);
00454 attrs.load(this->admin_properties().max_global_queue_length());
00455 attrs.load(this->admin_properties().max_consumers());
00456 attrs.load(this->admin_properties().max_suppliers());
00457 attrs.load(this->admin_properties().reject_new_events());
00458 this->admin_properties().init();
00459 }
00460
00461 TAO_Notify::Topology_Object *
00462 TAO_Notify_EventChannel::load_child (const ACE_CString &type,
00463 CORBA::Long id,
00464 const TAO_Notify::NVPList& attrs)
00465 {
00466 TAO_Notify::Topology_Object* result = this;
00467 if (type == "consumer_admin")
00468 {
00469 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00470 ACE_TEXT ("(%P|%t) EventChannel reload consumer_admin %d\n")
00471 , static_cast<int> (id)
00472 ));
00473
00474
00475 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00476 TAO_Notify_ConsumerAdmin * ca = bld->build_consumer_admin (
00477 this,
00478 id);
00479 ca->load_attrs (attrs);
00480 if (ca->is_default ())
00481 {
00482 CORBA::Object_var caob = this->poa()->servant_to_reference (ca);
00483 this->default_consumer_admin_ =
00484 CosNotifyChannelAdmin::ConsumerAdmin::_narrow (
00485 caob.in ());
00486 }
00487 result = ca;
00488 }
00489 else if (type == "supplier_admin")
00490 {
00491 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00492 ACE_TEXT ("(%P|%t) EventChannel reload supplier_admin %d\n")
00493 , static_cast<int> (id)
00494 ));
00495 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00496
00497 TAO_Notify_SupplierAdmin * sa = bld->build_supplier_admin (
00498 this,
00499 id);
00500 sa->load_attrs (attrs);
00501 if (sa->is_default ())
00502 {
00503 CORBA::Object_var saob = this->poa()->servant_to_reference (sa);
00504 this->default_supplier_admin_ =
00505 CosNotifyChannelAdmin::SupplierAdmin::_narrow (
00506 saob.in ());
00507 }
00508 result = sa;
00509 }
00510 return result;
00511 }
00512 TAO_Notify_ProxyConsumer *
00513 TAO_Notify_EventChannel::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00514 {
00515 TAO_Notify_ProxyConsumer * result = 0;
00516 size_t path_size = id_path.size ();
00517 if (position < path_size)
00518 {
00519 TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00520 TAO_Notify_SupplierAdmin * admin = find_worker.find (id_path[position], this->sa_container());
00521 ++position;
00522 if (admin != 0)
00523 {
00524 result = admin->find_proxy_consumer (id_path, position);
00525 }
00526 }
00527 return result;
00528 }
00529
00530 TAO_Notify_ProxySupplier *
00531 TAO_Notify_EventChannel::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00532 {
00533 TAO_Notify_ProxySupplier * result = 0;
00534 size_t path_size = id_path.size ();
00535 if (position < path_size)
00536 {
00537 TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00538 TAO_Notify_ConsumerAdmin * admin = find_worker.find (id_path[position], this->ca_container());
00539 ++position;
00540 if (admin != 0)
00541 {
00542 result = admin->find_proxy_supplier (id_path, position);
00543 }
00544 }
00545 return result;
00546 }
00547
00548
00549 void
00550 TAO_Notify_EventChannel::reconnect (void)
00551 {
00552 TAO_Notify::Reconnect_Worker<TAO_Notify_ConsumerAdmin> ca_wrk;
00553 this->ca_container().collection()->for_each(&ca_wrk);
00554
00555 TAO_Notify::Reconnect_Worker<TAO_Notify_SupplierAdmin> sa_wrk;
00556 this->sa_container().collection()->for_each(&sa_wrk);
00557 }
00558
00559 TAO_Notify_EventChannel::TAO_Notify_ConsumerAdmin_Container&
00560 TAO_Notify_EventChannel::ca_container()
00561 {
00562 ACE_ASSERT( this->ca_container_.get() != 0 );
00563 return *ca_container_;
00564 }
00565
00566 TAO_Notify_EventChannel::TAO_Notify_SupplierAdmin_Container&
00567 TAO_Notify_EventChannel::sa_container()
00568 {
00569 ACE_ASSERT( this->sa_container_.get() != 0 );
00570 return *sa_container_;
00571 }
00572
00573 TAO_END_VERSIONED_NAMESPACE_DECL