00001
00002
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004
00005 ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "$Id: EventChannelFactory.cpp 79247 2007-08-07 15:24:22Z elliott_c $")
00006
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Factory.h"
00009 #include "orbsvcs/Notify/Builder.h"
00010 #include "orbsvcs/Notify/Topology_Saver.h"
00011 #include "orbsvcs/Notify/Topology_Loader.h"
00012 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00013 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00014 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00015 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00016 #include "orbsvcs/Notify/EventChannel.h"
00017 #include "orbsvcs/Notify/Container_T.h"
00018 #include "orbsvcs/Notify/Find_Worker_T.h"
00019 #include "orbsvcs/Notify/Seq_Worker_T.h"
00020 #include "orbsvcs/Notify/POA_Helper.h"
00021
00022 #include "ace/Dynamic_Service.h"
00023
00024 #include "tao/debug.h"
00025
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032
00033
00034 namespace TAO_Notify
00035 {
00036
00037 Topology_Factory::~Topology_Factory ()
00038 {
00039 }
00040 }
00041
00042 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00043 , CosNotifyChannelAdmin::EventChannel
00044 , CosNotifyChannelAdmin::EventChannel_ptr
00045 , CosNotifyChannelAdmin::ChannelNotFound>
00046 TAO_Notify_EventChannel_Find_Worker;
00047
00048 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00049
00050 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00051 : topology_save_seq_ (0)
00052 , topology_factory_(0)
00053 , reconnect_registry_(*this)
00054 , loading_topology_ (false)
00055 {
00056 }
00057
00058 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00059 {
00060 }
00061
00062 void
00063 TAO_Notify_EventChannelFactory::destroy (void)
00064 {
00065 int result = this->shutdown ();
00066 if ( result == 1)
00067 return;
00068
00069 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00070
00071
00072 properties->orb (CORBA::ORB::_nil ());
00073 properties->default_poa (PortableServer::POA::_nil ());
00074
00075 ec_container_.reset( 0 );
00076 }
00077
00078 void
00079 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa)
00080 {
00081 ACE_ASSERT (this->ec_container_.get() == 0);
00082
00083 this->default_filter_factory_ =
00084 TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory ();
00085
00086
00087 TAO_Notify_EventChannel_Container* ecc = 0;
00088 ACE_NEW_THROW_EX (ecc,
00089 TAO_Notify_EventChannel_Container (),
00090 CORBA::INTERNAL ());
00091 this->ec_container_.reset( ecc );
00092
00093 this->ec_container().init ();
00094
00095 TAO_Notify_POA_Helper* object_poa = 0;
00096
00097
00098 ACE_NEW_THROW_EX (object_poa,
00099 TAO_Notify_POA_Helper (),
00100 CORBA::NO_MEMORY ());
00101
00102 ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00103
00104 ACE_CString poa_name = object_poa->get_unique_id ();
00105 #if defined (CORBA_E_MICRO)
00106 object_poa->init (poa, poa_name.c_str ());
00107 #else
00108 object_poa->init_persistent (poa, poa_name.c_str ());
00109 #endif
00110
00111 this->adopt_poa (auto_object_poa.release ());
00112
00113
00114
00115 this->topology_factory_ =
00116 ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00117
00118 this->load_topology ();
00119
00120 this->load_event_persistence ();
00121 }
00122
00123 void
00124 TAO_Notify_EventChannelFactory::_add_ref (void)
00125 {
00126 this->_incr_refcnt ();
00127 }
00128
00129 void
00130 TAO_Notify_EventChannelFactory::_remove_ref (void)
00131 {
00132 this->_decr_refcnt ();
00133 }
00134
00135 void
00136 TAO_Notify_EventChannelFactory::release (void)
00137 {
00138 delete this;
00139
00140 }
00141
00142 void
00143 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel)
00144 {
00145 this->ec_container().remove (event_channel);
00146 this->self_change ();
00147 }
00148
00149 int
00150 TAO_Notify_EventChannelFactory::shutdown (void)
00151 {
00152 int sd_ret = TAO_Notify_Object::shutdown ();
00153
00154 if (sd_ret == 1)
00155 return 1;
00156
00157 this->ec_container().shutdown ();
00158
00159 return 0;
00160 }
00161
00162 CosNotifyFilter::FilterFactory_ptr
00163 TAO_Notify_EventChannelFactory::get_default_filter_factory (void)
00164 {
00165 return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00166 }
00167
00168 CosNotifyChannelAdmin::EventChannel_ptr
00169 TAO_Notify_EventChannelFactory::create_named_channel (
00170 const CosNotification::QoSProperties& initial_qos,
00171 const CosNotification::AdminProperties& initial_admin,
00172 CosNotifyChannelAdmin::ChannelID_out id,
00173 const char*)
00174 {
00175 return this->create_channel (initial_qos, initial_admin, id);
00176 }
00177
00178 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00179 const CosNotification::QoSProperties & initial_qos,
00180 const CosNotification::AdminProperties & initial_admin,
00181 CosNotifyChannelAdmin::ChannelID_out id
00182 )
00183 {
00184 CosNotifyChannelAdmin::EventChannel_var ec =
00185 TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00186 , initial_qos
00187 , initial_admin
00188 , id);
00189 this->self_change ();
00190 return ec._retn ();
00191 }
00192
00193 CosNotifyChannelAdmin::ChannelIDSeq*
00194 TAO_Notify_EventChannelFactory::get_all_channels (void)
00195 {
00196 TAO_Notify_EventChannel_Seq_Worker seq_worker;
00197
00198 return seq_worker.create (this->ec_container());
00199 }
00200
00201 CosNotifyChannelAdmin::EventChannel_ptr
00202 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id)
00203 {
00204 TAO_Notify_EventChannel_Find_Worker find_worker;
00205
00206 return find_worker.resolve (id, this->ec_container());
00207 }
00208
00209 void
00210 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00211 {
00212 ACE_DEBUG ((LM_DEBUG,
00213 ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00214 ));
00215
00216
00217
00218 this->topology_factory_ = f;
00219 }
00220
00221 void
00222 TAO_Notify_EventChannelFactory::load_topology (void)
00223 {
00224 this->loading_topology_ = true;
00225 if (this->topology_factory_ != 0)
00226 {
00227
00228 auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00229 if (tl.get () != 0)
00230 {
00231 tl->load (this);
00232 }
00233 }
00234 else
00235 {
00236 if (TAO_debug_level > 0)
00237 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00238 }
00239 this->loading_topology_ = false;
00240 }
00241 bool
00242 TAO_Notify_EventChannelFactory::is_persistent () const
00243 {
00244 return true;
00245 }
00246
00247 void
00248 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver)
00249 {
00250 bool changed = this->self_changed_;
00251 this->self_changed_ = false;
00252 this->children_changed_ = false;
00253
00254 TAO_Notify::NVPList attrs;
00255
00256 bool want_all_children =
00257 saver.begin_object(0, "channel_factory", attrs, changed);
00258
00259
00260
00261
00262 TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00263
00264 this->ec_container().collection()->for_each(&wrk);
00265
00266 if (want_all_children || this->reconnect_registry_.is_changed ())
00267 {
00268 this->reconnect_registry_.save_persistent(saver);
00269 }
00270 saver.end_object(0, "channel_factory");
00271 }
00272
00273 void
00274 TAO_Notify_EventChannelFactory::load_event_persistence (void)
00275 {
00276 TAO_Notify::Event_Persistence_Strategy * strategy =
00277 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00278 if (strategy != 0)
00279 {
00280 if (this->topology_factory_ != 0)
00281 {
00282 TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00283 if (factory != 0)
00284 {
00285 for (
00286 TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00287 rspm != 0;
00288 rspm = rspm->load_next ())
00289 {
00290 TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00291 if (!routing_slip.null ())
00292 {
00293 this->routing_slip_restart_set_.insert (routing_slip);
00294 }
00295 else
00296 {
00297
00298
00299 ACE_DEBUG ((LM_DEBUG,
00300 ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00301 ));
00302 }
00303 }
00304 }
00305 }
00306 else
00307 {
00308 ACE_ERROR ((LM_ERROR,
00309 ACE_TEXT ("(%P|%t) Notify Service: Configuration error. Event Persistence requires Topology Persistence.\n")
00310 ));
00311 throw CORBA::PERSIST_STORE();
00312 }
00313 }
00314 }
00315
00316 bool
00317 TAO_Notify_EventChannelFactory::change_to_parent (void)
00318 {
00319 bool saving = false;
00320 if (! this->loading_topology_)
00321 {
00322
00323 if (this->topology_factory_ != 0)
00324 {
00325 saving = true;
00326
00327
00328
00329
00330 short seq = this->topology_save_seq_;
00331 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00332 if (seq == this->topology_save_seq_)
00333 {
00334 auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00335 if (saver.get() != 0)
00336 {
00337 this->save_persistent(*saver);
00338 saver->close ();
00339 }
00340 this->topology_save_seq_ += 1;
00341 }
00342 }
00343 }
00344 return saving;
00345 }
00346
00347 TAO_Notify::Topology_Object*
00348 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00349 CORBA::Long id,
00350 const TAO_Notify::
00351 NVPList& attrs)
00352 {
00353
00354 TAO_Notify::Topology_Object * result = this;
00355 if (type == "channel")
00356 {
00357 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00358 ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00359 , static_cast<int> (id)
00360 ));
00361
00362 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00363 TAO_Notify_EventChannel * ec = bld->build_event_channel(
00364 this ,
00365 id);
00366
00367 ec->load_attrs (attrs);
00368
00369 result = ec;
00370 }
00371 else if (type == TAO_Notify::REGISTRY_TYPE)
00372 {
00373 result = & this->reconnect_registry_;
00374 }
00375 return result;
00376 }
00377
00378 void
00379 TAO_Notify_EventChannelFactory::reconnect (void)
00380 {
00381
00382 TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00383
00384 this->ec_container().collection()->for_each(&wrk);
00385
00386
00387 ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00388 this->reconnect_registry_.send_reconnect (this->channel_factory_.in ());
00389
00390
00391 Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00392 TAO_Notify::Routing_Slip_Ptr * routing_slip;
00393 for (iter.first(); iter.next(routing_slip); iter.advance())
00394 {
00395 (*routing_slip)->reconnect();
00396 }
00397 this->routing_slip_restart_set_.reset ();
00398 }
00399
00400 NotifyExt::ReconnectionRegistry::ReconnectionID
00401 TAO_Notify_EventChannelFactory::register_callback (
00402 NotifyExt::ReconnectionCallback_ptr reconnection)
00403 {
00404 return this->reconnect_registry_.register_callback (
00405 reconnection);
00406 }
00407
00408 void
00409 TAO_Notify_EventChannelFactory::unregister_callback (
00410 NotifyExt::ReconnectionRegistry::ReconnectionID id)
00411 {
00412 this->reconnect_registry_.unregister_callback (
00413 id);
00414 }
00415
00416 CORBA::Boolean
00417 TAO_Notify_EventChannelFactory::is_alive (void)
00418 {
00419 return CORBA::Boolean (1);
00420 }
00421
00422 void
00423 TAO_Notify_EventChannelFactory::save_topology (void)
00424 {
00425 this->self_change ();
00426 }
00427
00428 TAO_Notify_ProxyConsumer *
00429 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00430 {
00431 TAO_Notify_ProxyConsumer * result = 0;
00432 size_t path_size = id_path.size ();
00433
00434
00435
00436 if (position < path_size && id_path[position] == this->id())
00437 {
00438 ++position;
00439 }
00440 if (position < path_size)
00441 {
00442 TAO_Notify_EventChannel_Find_Worker find_worker;
00443
00444 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00445 ++position;
00446 if (ec != 0)
00447 {
00448 result = ec->find_proxy_consumer (id_path, position);
00449 }
00450 }
00451 return result;
00452 }
00453
00454 TAO_Notify_ProxySupplier *
00455 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00456 {
00457 TAO_Notify_ProxySupplier * result = 0;
00458 size_t path_size = id_path.size ();
00459
00460
00461
00462 if (position < path_size && id_path[position] == this->id())
00463 {
00464 ++position;
00465 }
00466 if (position < path_size)
00467 {
00468 TAO_Notify_EventChannel_Find_Worker find_worker;
00469 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00470 ++position;
00471 if (ec != 0)
00472 {
00473 result = ec->find_proxy_supplier (id_path, position);
00474 }
00475 }
00476 return result;
00477 }
00478
00479 CosNotifyChannelAdmin::EventChannelFactory_ptr
00480 TAO_Notify_EventChannelFactory::activate_self (void)
00481 {
00482 CORBA::Object_var obj = this->activate (this);
00483 this->channel_factory_
00484 = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in());
00485
00486 try
00487 {
00488 if (DEBUG_LEVEL > 9)
00489 {
00490 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00491 }
00492 this->reconnect ();
00493 }
00494 catch (const CORBA::Exception&)
00495 {
00496
00497 }
00498 return this->channel_factory_._retn();
00499 }
00500
00501
00502 TAO_Notify_Object::ID
00503 TAO_Notify_EventChannelFactory::get_id () const
00504 {
00505 return id();
00506 }
00507
00508 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00509 TAO_Notify_EventChannelFactory::ec_container()
00510 {
00511 ACE_ASSERT( this->ec_container_.get() != 0 );
00512 return *ec_container_;
00513 }
00514
00515 TAO_END_VERSIONED_NAMESPACE_DECL