EventChannel.cpp

Go to the documentation of this file.
00001 // $Id: EventChannel.cpp 80974 2008-03-17 13:40:44Z johnnyw $
00002 
00003 #include "orbsvcs/Notify/EventChannel.h"
00004 
00005 #include "orbsvcs/Notify/Container_T.h"
00006 #include "orbsvcs/Notify/EventChannelFactory.h"
00007 #include "orbsvcs/Notify/ConsumerAdmin.h"
00008 #include "orbsvcs/Notify/SupplierAdmin.h"
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Find_Worker_T.h"
00013 #include "orbsvcs/Notify/Seq_Worker_T.h"
00014 #include "orbsvcs/Notify/Topology_Saver.h"
00015 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00016 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00017 #include "orbsvcs/Notify/Proxy.h"
00018 #include "orbsvcs/Notify/Event_Manager.h"
00019 #include "orbsvcs/Notify/POA_Helper.h"
00020 
00021 #include "tao/debug.h"
00022 //#define DEBUG_LEVEL 9
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026 
00027 ACE_RCSID(Notify, TAO_Notify_EventChannel, "$Id: EventChannel.cpp 80974 2008-03-17 13:40:44Z johnnyw $")
00028 
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 typedef TAO_Notify_Find_Worker_T<TAO_Notify_ConsumerAdmin
00032                              , CosNotifyChannelAdmin::ConsumerAdmin
00033                              , CosNotifyChannelAdmin::ConsumerAdmin_ptr
00034                              , CosNotifyChannelAdmin::AdminNotFound>
00035 TAO_Notify_ConsumerAdmin_Find_Worker;
00036 
00037 typedef TAO_Notify_Find_Worker_T<TAO_Notify_SupplierAdmin
00038                              , CosNotifyChannelAdmin::SupplierAdmin
00039                              , CosNotifyChannelAdmin::SupplierAdmin_ptr
00040                              , CosNotifyChannelAdmin::AdminNotFound>
00041 TAO_Notify_SupplierAdmin_Find_Worker;
00042 
00043 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_ConsumerAdmin> TAO_Notify_ConsumerAdmin_Seq_Worker;
00044 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_SupplierAdmin> TAO_Notify_SupplierAdmin_Seq_Worker;
00045 
00046 TAO_Notify_EventChannel::TAO_Notify_EventChannel (void)
00047   : ecf_ (0)
00048   , ca_container_ (0)
00049   , sa_container_ (0)
00050   , default_filter_factory_ (CosNotifyFilter::FilterFactory::_nil ())
00051 {
00052 }
00053 
00054 TAO_Notify_EventChannel::~TAO_Notify_EventChannel ()
00055 {
00056 }
00057 
00058 void
00059 TAO_Notify_EventChannel::init (TAO_Notify_EventChannelFactory* ecf
00060                            , const CosNotification::QoSProperties & initial_qos
00061                            , const CosNotification::AdminProperties & initial_admin)
00062 {
00063   ACE_ASSERT (this->ca_container_.get() == 0);
00064 
00065   // this-> on the following line confuses VC6
00066   initialize (ecf);
00067 
00068   this->ecf_.reset (ecf);
00069 
00070   // Init ca_container_
00071   TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00072   ACE_NEW_THROW_EX (ca_container,
00073                     TAO_Notify_ConsumerAdmin_Container (),
00074                     CORBA::INTERNAL ());
00075   this->ca_container_.reset (ca_container);
00076 
00077   this->ca_container().init ();
00078 
00079   // Init ca_container_
00080   TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00081   ACE_NEW_THROW_EX (sa_container,
00082                     TAO_Notify_SupplierAdmin_Container (),
00083                     CORBA::INTERNAL ());
00084   this->sa_container_.reset (sa_container);
00085 
00086   this->sa_container().init ();
00087 
00088   // Set the admin properties.
00089   TAO_Notify_AdminProperties* admin_properties = 0;
00090   ACE_NEW_THROW_EX (admin_properties,
00091                     TAO_Notify_AdminProperties (),
00092                     CORBA::NO_MEMORY ());
00093   this->set_admin_properties (admin_properties);
00094 
00095   // create the event manager. @@ use factory
00096   TAO_Notify_Event_Manager* event_manager = 0;
00097   ACE_NEW_THROW_EX (event_manager,
00098                     TAO_Notify_Event_Manager (),
00099                     CORBA::INTERNAL ());
00100   this->set_event_manager (event_manager);
00101 
00102   this->event_manager().init ();
00103 
00104   const CosNotification::QoSProperties &default_ec_qos =
00105     TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00106 
00107   this->set_qos (default_ec_qos);
00108 
00109   this->set_qos (initial_qos);
00110 
00111   this->set_admin (initial_admin);
00112 
00113   PortableServer::POA_var default_poa = TAO_Notify_PROPERTIES::instance ()->default_poa ();
00114   this->default_filter_factory_ =
00115     TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (default_poa.in());
00116 
00117   // Note originally default admins were allocated here, bt this caused problems
00118   // attempting to save the topology changes before the Event Channel was completely
00119   // constructed and linked to the ECF.
00120   // Lazy evaluation also avoids creating unneded admins.
00121 }
00122 
00123 
00124 void
00125 TAO_Notify_EventChannel::init (TAO_Notify::Topology_Parent* parent)
00126 {
00127   ACE_ASSERT (this->ecf_.get() == 0);
00128   // this-> on the following line confuses VC6
00129   initialize (parent);
00130 
00131   this->ecf_.reset (dynamic_cast <TAO_Notify_EventChannelFactory*>(parent));
00132   ACE_ASSERT (this->ecf_.get() !=0);
00133 
00134   // Init ca_container_
00135   TAO_Notify_ConsumerAdmin_Container* ca_container = 0;
00136   ACE_NEW_THROW_EX (ca_container,
00137                     TAO_Notify_ConsumerAdmin_Container (),
00138                     CORBA::INTERNAL ());
00139   this->ca_container_.reset (ca_container);
00140 
00141   this->ca_container().init ();
00142 
00143   TAO_Notify_SupplierAdmin_Container* sa_container = 0;
00144   // Init ca_container_
00145   ACE_NEW_THROW_EX (sa_container,
00146                     TAO_Notify_SupplierAdmin_Container (),
00147                     CORBA::INTERNAL ());
00148   this->sa_container_.reset (sa_container);
00149 
00150   this->sa_container().init ();
00151 
00152   // Set the admin properties.
00153   TAO_Notify_AdminProperties* admin_properties = 0;
00154   ACE_NEW_THROW_EX (admin_properties,
00155                     TAO_Notify_AdminProperties (),
00156                     CORBA::NO_MEMORY ());
00157   this->set_admin_properties (admin_properties);
00158 
00159   // create the event manager. @@ use factory
00160   TAO_Notify_Event_Manager* event_manager = 0;
00161   ACE_NEW_THROW_EX (event_manager,
00162                     TAO_Notify_Event_Manager (),
00163                     CORBA::INTERNAL ());
00164   this->set_event_manager (event_manager);
00165 
00166   this->event_manager().init ();
00167 
00168   const CosNotification::QoSProperties &default_ec_qos =
00169     TAO_Notify_PROPERTIES::instance ()->default_event_channel_qos_properties ();
00170 
00171   this->set_qos (default_ec_qos);
00172 
00173   PortableServer::POA_var default_poa = TAO_Notify_PROPERTIES::instance ()->default_poa ();
00174   this->default_filter_factory_ =
00175     TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (default_poa.in ());
00176 }
00177 
00178 
00179 void
00180 TAO_Notify_EventChannel::_add_ref (void)
00181 {
00182   this->_incr_refcnt ();
00183 }
00184 
00185 void
00186 TAO_Notify_EventChannel::_remove_ref (void)
00187 {
00188   this->_decr_refcnt ();
00189 }
00190 
00191 void
00192 TAO_Notify_EventChannel::release (void)
00193 {
00194   delete this;
00195   //@@ inform factory
00196 }
00197 
00198 void
00199 TAO_Notify_EventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID, bool)
00200 { }
00201 
00202 int
00203 TAO_Notify_EventChannel::shutdown (void)
00204 {
00205   int sd_ret = TAO_Notify_Object::shutdown ();
00206 
00207   if (sd_ret == 1)
00208     return 1;
00209 
00210   this->ca_container().shutdown ();
00211 
00212   this->sa_container().shutdown ();
00213 
00214   this->event_manager().shutdown ();
00215 
00216   return 0;
00217 }
00218 
00219 void
00220 TAO_Notify_EventChannel::destroy (void)
00221 {
00222   TAO_Notify_EventChannel::Ptr guard( this );
00223 
00224   if (this->shutdown () == 1)
00225     return;
00226 
00227   this->ecf_->remove (this);
00228 
00229   this->sa_container ().destroy ();
00230   this->ca_container ().destroy ();
00231 
00232   this->sa_container_.reset( 0 );
00233   this->ca_container_.reset( 0 );
00234 
00235   this->default_filter_factory_ = CosNotifyFilter::FilterFactory::_nil();
00236 }
00237 
00238 void
00239 TAO_Notify_EventChannel::remove (TAO_Notify_ConsumerAdmin* consumer_admin)
00240 {
00241   this->ca_container().remove (consumer_admin);
00242 }
00243 
00244 void
00245 TAO_Notify_EventChannel::remove (TAO_Notify_SupplierAdmin* supplier_admin)
00246 {
00247   this->sa_container().remove (supplier_admin);
00248 }
00249 
00250 void
00251 TAO_Notify_EventChannel::set_qos (const CosNotification::QoSProperties & qos)
00252 {
00253   this->TAO_Notify_Object::set_qos (qos);
00254 }
00255 
00256 CosNotification::QoSProperties*
00257 TAO_Notify_EventChannel::get_qos (void)
00258 {
00259   return this->TAO_Notify_Object::get_qos ();
00260 }
00261 
00262 CosNotifyChannelAdmin::EventChannelFactory_ptr
00263 TAO_Notify_EventChannel::MyFactory (void)
00264 {
00265   return this->ecf_->_this ();
00266 }
00267 
00268 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00269 TAO_Notify_EventChannel::default_consumer_admin (void)
00270 {
00271   if (CORBA::is_nil (default_consumer_admin_.in ()))
00272     {
00273       ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::ConsumerAdmin::_nil());
00274       if (CORBA::is_nil (default_consumer_admin_.in ()))
00275         {
00276           CosNotifyChannelAdmin::AdminID id;
00277           this->default_consumer_admin_ = this->new_for_consumers (CosNotifyChannelAdmin::OR_OP, id);
00278           // Wish there was a better way to do this!
00279           PortableServer::ServantBase * admin_servant =
00280             this->poa()->reference_to_servant (
00281               this->default_consumer_admin_.in ());
00282           TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00283           ACE_ASSERT (pAdmin != 0); // if this assert triggers, we have mixed implementations?
00284           if (pAdmin != 0)
00285             {
00286               pAdmin->set_default (true);
00287             }
00288         }
00289     }
00290   return CosNotifyChannelAdmin::ConsumerAdmin::_duplicate (this->default_consumer_admin_.in ());
00291 }
00292 
00293 CosNotifyChannelAdmin::SupplierAdmin_ptr
00294 TAO_Notify_EventChannel::default_supplier_admin (void)
00295 {
00296   if (CORBA::is_nil (default_supplier_admin_.in ()))
00297     {
00298       ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->default_admin_mutex_, CosNotifyChannelAdmin::SupplierAdmin::_nil());
00299       if (CORBA::is_nil (default_supplier_admin_.in ()))
00300         {
00301           CosNotifyChannelAdmin::AdminID id;
00302           this->default_supplier_admin_ = this->new_for_suppliers (CosNotifyChannelAdmin::OR_OP, id);
00303           PortableServer::ServantBase * admin_servant =
00304             this->poa()->poa()->reference_to_servant (
00305               this->default_supplier_admin_.in ());
00306           TAO_Notify_Admin * pAdmin = dynamic_cast <TAO_Notify_Admin *> (admin_servant);
00307           ACE_ASSERT (pAdmin != 0); // if this assert triggers, we have mixed implementations?
00308           if (pAdmin != 0)
00309             {
00310               pAdmin->set_default (true);
00311             }
00312         }
00313     }
00314   return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (this->default_supplier_admin_.in ());
00315 }
00316 
00317 ::CosNotifyFilter::FilterFactory_ptr
00318 TAO_Notify_EventChannel::default_filter_factory (void)
00319 {
00320   return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00321 }
00322 
00323 ::CosNotifyChannelAdmin::ConsumerAdmin_ptr
00324 TAO_Notify_EventChannel::new_for_consumers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00325                                         CosNotifyChannelAdmin::AdminID_out id)
00326 {
00327   ::CosNotifyChannelAdmin::ConsumerAdmin_var ca =
00328     TAO_Notify_PROPERTIES::instance()->builder()->build_consumer_admin (this, op, id);
00329   this->self_change ();
00330   return ca._retn ();
00331 }
00332 
00333 ::CosNotifyChannelAdmin::SupplierAdmin_ptr
00334 TAO_Notify_EventChannel::new_for_suppliers (CosNotifyChannelAdmin::InterFilterGroupOperator op,
00335                                         CosNotifyChannelAdmin::AdminID_out id)
00336 {
00337   ::CosNotifyChannelAdmin::SupplierAdmin_var sa =
00338     TAO_Notify_PROPERTIES::instance()->builder()->build_supplier_admin (this, op, id);
00339   this->self_change ();
00340   return sa._retn ();
00341 }
00342 
00343 CosNotifyChannelAdmin::ConsumerAdmin_ptr
00344 TAO_Notify_EventChannel::get_consumeradmin (CosNotifyChannelAdmin::AdminID id)
00345 {
00346   TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00347 
00348   return find_worker.resolve (id, this->ca_container());
00349 }
00350 
00351 CosNotifyChannelAdmin::SupplierAdmin_ptr
00352 TAO_Notify_EventChannel::get_supplieradmin (CosNotifyChannelAdmin::AdminID id)
00353 {
00354   TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00355 
00356   return find_worker.resolve (id, this->sa_container());
00357 }
00358 
00359 CosNotifyChannelAdmin::AdminIDSeq*
00360 TAO_Notify_EventChannel::get_all_consumeradmins (void)
00361 {
00362   TAO_Notify_ConsumerAdmin_Seq_Worker seq_worker;
00363 
00364   return seq_worker.create (this->ca_container());
00365 }
00366 
00367 CosNotifyChannelAdmin::AdminIDSeq*
00368 TAO_Notify_EventChannel::get_all_supplieradmins (void)
00369 {
00370   TAO_Notify_SupplierAdmin_Seq_Worker seq_worker;
00371 
00372   return seq_worker.create (this->sa_container());
00373 }
00374 
00375 void
00376 TAO_Notify_EventChannel::set_admin (const CosNotification::AdminProperties & admin)
00377 {
00378   this->admin_properties().init (admin);
00379 }
00380 
00381 CosNotification::AdminProperties*
00382 TAO_Notify_EventChannel::get_admin (void)
00383 {
00384   CosNotification::AdminProperties_var properties;
00385 
00386   ACE_NEW_THROW_EX (properties,
00387                     CosNotification::AdminProperties (),
00388                     CORBA::NO_MEMORY ());
00389 
00390   this->admin_properties().populate (properties);
00391 
00392   return properties._retn ();
00393 }
00394 
00395 CosEventChannelAdmin::ConsumerAdmin_ptr
00396 TAO_Notify_EventChannel::for_consumers (void)
00397 {
00398   return this->default_consumer_admin();
00399 }
00400 
00401 CosEventChannelAdmin::SupplierAdmin_ptr
00402 TAO_Notify_EventChannel::for_suppliers (void)
00403 {
00404   return this->default_supplier_admin ();
00405 }
00406 
00407 void
00408 TAO_Notify_EventChannel::validate_qos (const CosNotification::QoSProperties & /*required_qos*/,
00409                                    CosNotification::NamedPropertyRangeSeq_out /*available_qos*/)
00410 {
00411   throw CORBA::NO_IMPLEMENT ();
00412 }
00413 
00414 void
00415 TAO_Notify_EventChannel::save_persistent (TAO_Notify::Topology_Saver& saver)
00416 {
00417   bool changed = this->self_changed_;
00418   this->self_changed_ = false;
00419   this->children_changed_ = false;
00420 
00421   if (is_persistent ())
00422   {
00423     TAO_Notify::NVPList attrs;
00424     this->save_attrs(attrs);
00425 
00426     bool want_all_children = saver.begin_object(
00427       this->id(), "channel", attrs, changed);
00428 
00429     TAO_Notify::Save_Persist_Worker<TAO_Notify_ConsumerAdmin> ca_wrk(saver, want_all_children);
00430 
00431     this->ca_container().collection()->for_each(&ca_wrk);
00432 
00433     TAO_Notify::Save_Persist_Worker<TAO_Notify_SupplierAdmin> sa_wrk(saver, want_all_children);
00434     this->sa_container().collection()->for_each(&sa_wrk);
00435 
00436     saver.end_object(this->id(), "channel");
00437   }
00438 }
00439 
00440 namespace {
00441   template<class T>
00442     void add_attr(TAO_Notify::NVPList& attrs, const T& prop) {
00443       if (prop.is_valid())
00444       {
00445         attrs.push_back(TAO_Notify::NVP (prop));
00446       }
00447     }
00448 }
00449 
00450 void
00451 TAO_Notify_EventChannel::save_attrs(TAO_Notify::NVPList& attrs)
00452 {
00453   TAO_Notify_Object::save_attrs(attrs);
00454   add_attr(attrs, this->admin_properties().max_global_queue_length());
00455   add_attr(attrs, this->admin_properties().max_consumers());
00456   add_attr(attrs, this->admin_properties().max_suppliers());
00457   add_attr(attrs, this->admin_properties().reject_new_events());
00458 }
00459 
00460 void
00461 TAO_Notify_EventChannel::load_attrs(const TAO_Notify::NVPList& attrs)
00462 {
00463   TAO_Notify_Object::load_attrs(attrs);
00464   attrs.load(this->admin_properties().max_global_queue_length());
00465   attrs.load(this->admin_properties().max_consumers());
00466   attrs.load(this->admin_properties().max_suppliers());
00467   attrs.load(this->admin_properties().reject_new_events());
00468   this->admin_properties().init();
00469 }
00470 
00471 TAO_Notify::Topology_Object *
00472 TAO_Notify_EventChannel::load_child (const ACE_CString &type,
00473                                                   CORBA::Long id,
00474                                                   const TAO_Notify::NVPList& attrs)
00475 {
00476   TAO_Notify::Topology_Object* result = this;
00477   if (type == "consumer_admin")
00478   {
00479     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00480       ACE_TEXT ("(%P|%t) EventChannel reload consumer_admin %d\n")
00481       , static_cast<int> (id)
00482       ));
00483 
00484     // call special builder method to reload
00485     TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00486     TAO_Notify_ConsumerAdmin * ca = bld->build_consumer_admin (
00487       this,
00488       id);
00489     ca->load_attrs (attrs);
00490     if (ca->is_default ())
00491       {
00492         CORBA::Object_var caob = this->poa()->servant_to_reference (ca);
00493         this->default_consumer_admin_ =
00494           CosNotifyChannelAdmin::ConsumerAdmin::_narrow (
00495           caob.in ());
00496       }
00497     result = ca;
00498   }
00499   else if (type == "supplier_admin")
00500   {
00501     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00502       ACE_TEXT ("(%P|%t) EventChannel reload supplier_admin %d\n")
00503       , static_cast<int> (id)
00504       ));
00505     TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00506 
00507     TAO_Notify_SupplierAdmin * sa = bld->build_supplier_admin (
00508       this,
00509       id);
00510     sa->load_attrs (attrs);
00511     if (sa->is_default ())
00512       {
00513         CORBA::Object_var saob = this->poa()->servant_to_reference (sa);
00514         this->default_supplier_admin_ =
00515           CosNotifyChannelAdmin::SupplierAdmin::_narrow (
00516           saob.in ());
00517       }
00518     result = sa;
00519   }
00520   return result;
00521 }
00522 TAO_Notify_ProxyConsumer *
00523 TAO_Notify_EventChannel::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00524 {
00525   TAO_Notify_ProxyConsumer * result = 0;
00526   size_t path_size = id_path.size ();
00527   if (position < path_size)
00528   {
00529     TAO_Notify_SupplierAdmin_Find_Worker find_worker;
00530     TAO_Notify_SupplierAdmin * admin = find_worker.find (id_path[position], this->sa_container());
00531     ++position;
00532     if (admin != 0)
00533     {
00534       result = admin->find_proxy_consumer (id_path, position);
00535     }
00536   }
00537   return result;
00538 }
00539 
00540 TAO_Notify_ProxySupplier *
00541 TAO_Notify_EventChannel::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00542 {
00543   TAO_Notify_ProxySupplier * result = 0;
00544   size_t path_size = id_path.size ();
00545   if (position < path_size)
00546   {
00547     TAO_Notify_ConsumerAdmin_Find_Worker find_worker;
00548     TAO_Notify_ConsumerAdmin * admin = find_worker.find (id_path[position], this->ca_container());
00549     ++position;
00550     if (admin != 0)
00551     {
00552       result = admin->find_proxy_supplier (id_path, position);
00553     }
00554   }
00555   return result;
00556 }
00557 
00558 
00559 void
00560 TAO_Notify_EventChannel::reconnect (void)
00561 {
00562   TAO_Notify::Reconnect_Worker<TAO_Notify_ConsumerAdmin> ca_wrk;
00563   this->ca_container().collection()->for_each(&ca_wrk);
00564 
00565   TAO_Notify::Reconnect_Worker<TAO_Notify_SupplierAdmin> sa_wrk;
00566   this->sa_container().collection()->for_each(&sa_wrk);
00567 }
00568 
00569 TAO_Notify_EventChannel::TAO_Notify_ConsumerAdmin_Container&
00570 TAO_Notify_EventChannel::ca_container()
00571 {
00572   ACE_ASSERT( this->ca_container_.get() != 0 );
00573   return *ca_container_;
00574 }
00575 
00576 TAO_Notify_EventChannel::TAO_Notify_SupplierAdmin_Container&
00577 TAO_Notify_EventChannel::sa_container()
00578 {
00579   ACE_ASSERT( this->sa_container_.get() != 0 );
00580   return *sa_container_;
00581 }
00582 
00583 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7