00001
00002
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004
00005 ACE_RCSID(Notify,
00006 TAO_Notify_EventChannelFactory,
00007 "$Id: EventChannelFactory.cpp 80974 2008-03-17 13:40:44Z johnnyw $")
00008
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Topology_Saver.h"
00013 #include "orbsvcs/Notify/Topology_Loader.h"
00014 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00015 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00016 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00017 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00018 #include "orbsvcs/Notify/EventChannel.h"
00019 #include "orbsvcs/Notify/Container_T.h"
00020 #include "orbsvcs/Notify/Find_Worker_T.h"
00021 #include "orbsvcs/Notify/Seq_Worker_T.h"
00022 #include "orbsvcs/Notify/POA_Helper.h"
00023
00024 #include "ace/Dynamic_Service.h"
00025
00026 #include "tao/debug.h"
00027
00028 #ifndef DEBUG_LEVEL
00029 # define DEBUG_LEVEL TAO_debug_level
00030 #endif //DEBUG_LEVEL
00031
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033
00034
00035
00036 namespace TAO_Notify
00037 {
00038
00039 Topology_Factory::~Topology_Factory ()
00040 {
00041 }
00042 }
00043
00044 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00045 , CosNotifyChannelAdmin::EventChannel
00046 , CosNotifyChannelAdmin::EventChannel_ptr
00047 , CosNotifyChannelAdmin::ChannelNotFound>
00048 TAO_Notify_EventChannel_Find_Worker;
00049
00050 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00051
00052 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00053 : topology_save_seq_ (0)
00054 , topology_factory_(0)
00055 , reconnect_registry_(*this)
00056 , loading_topology_ (false)
00057 {
00058 }
00059
00060 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00061 {
00062 }
00063
00064 void
00065 TAO_Notify_EventChannelFactory::destroy (void)
00066 {
00067 if (this->shutdown () == 1)
00068 return;
00069
00070 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00071
00072
00073 properties->orb (CORBA::ORB::_nil ());
00074 properties->default_poa (PortableServer::POA::_nil ());
00075
00076 ec_container_.reset( 0 );
00077 }
00078
00079 void
00080 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa)
00081 {
00082 ACE_ASSERT (this->ec_container_.get() == 0);
00083
00084
00085 TAO_Notify_EventChannel_Container* ecc = 0;
00086 ACE_NEW_THROW_EX (ecc,
00087 TAO_Notify_EventChannel_Container (),
00088 CORBA::INTERNAL ());
00089 this->ec_container_.reset( ecc );
00090
00091 this->ec_container().init ();
00092
00093 TAO_Notify_POA_Helper* object_poa = 0;
00094
00095
00096 ACE_NEW_THROW_EX (object_poa,
00097 TAO_Notify_POA_Helper (),
00098 CORBA::NO_MEMORY ());
00099
00100 ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00101
00102 ACE_CString poa_name = object_poa->get_unique_id ();
00103 #if defined (CORBA_E_MICRO)
00104 object_poa->init (poa, poa_name.c_str ());
00105 #else
00106 object_poa->init_persistent (poa, poa_name.c_str ());
00107 #endif
00108
00109 this->adopt_poa (auto_object_poa.release ());
00110
00111
00112
00113 this->topology_factory_ =
00114 ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00115
00116 this->load_topology ();
00117
00118 this->load_event_persistence ();
00119 }
00120
00121 void
00122 TAO_Notify_EventChannelFactory::_add_ref (void)
00123 {
00124 this->_incr_refcnt ();
00125 }
00126
00127 void
00128 TAO_Notify_EventChannelFactory::_remove_ref (void)
00129 {
00130 this->_decr_refcnt ();
00131 }
00132
00133 void
00134 TAO_Notify_EventChannelFactory::release (void)
00135 {
00136 delete this;
00137
00138 }
00139
00140 void
00141 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel)
00142 {
00143 this->ec_container().remove (event_channel);
00144 this->self_change ();
00145 }
00146
00147 int
00148 TAO_Notify_EventChannelFactory::shutdown (void)
00149 {
00150 if (TAO_Notify_Object::shutdown () == 1)
00151 return 1;
00152
00153 this->ec_container().shutdown ();
00154
00155 return 0;
00156 }
00157
00158 CosNotifyChannelAdmin::EventChannel_ptr
00159 TAO_Notify_EventChannelFactory::create_named_channel (
00160 const CosNotification::QoSProperties& initial_qos,
00161 const CosNotification::AdminProperties& initial_admin,
00162 CosNotifyChannelAdmin::ChannelID_out id,
00163 const char*)
00164 {
00165 return this->create_channel (initial_qos, initial_admin, id);
00166 }
00167
00168 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00169 const CosNotification::QoSProperties & initial_qos,
00170 const CosNotification::AdminProperties & initial_admin,
00171 CosNotifyChannelAdmin::ChannelID_out id
00172 )
00173 {
00174 CosNotifyChannelAdmin::EventChannel_var ec =
00175 TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00176 , initial_qos
00177 , initial_admin
00178 , id);
00179 this->self_change ();
00180 return ec._retn ();
00181 }
00182
00183 CosNotifyChannelAdmin::ChannelIDSeq*
00184 TAO_Notify_EventChannelFactory::get_all_channels (void)
00185 {
00186 TAO_Notify_EventChannel_Seq_Worker seq_worker;
00187
00188 return seq_worker.create (this->ec_container());
00189 }
00190
00191 CosNotifyChannelAdmin::EventChannel_ptr
00192 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id)
00193 {
00194 TAO_Notify_EventChannel_Find_Worker find_worker;
00195
00196 return find_worker.resolve (id, this->ec_container());
00197 }
00198
00199 void
00200 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00201 {
00202 ACE_DEBUG ((LM_DEBUG,
00203 ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00204 ));
00205
00206
00207
00208 this->topology_factory_ = f;
00209 }
00210
00211 void
00212 TAO_Notify_EventChannelFactory::load_topology (void)
00213 {
00214 this->loading_topology_ = true;
00215 if (this->topology_factory_ != 0)
00216 {
00217
00218 auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00219 if (tl.get () != 0)
00220 {
00221 tl->load (this);
00222 }
00223 }
00224 else
00225 {
00226 if (TAO_debug_level > 0)
00227 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00228 }
00229 this->loading_topology_ = false;
00230 }
00231 bool
00232 TAO_Notify_EventChannelFactory::is_persistent () const
00233 {
00234 return true;
00235 }
00236
00237 void
00238 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver)
00239 {
00240 bool changed = this->self_changed_;
00241 this->self_changed_ = false;
00242 this->children_changed_ = false;
00243
00244 TAO_Notify::NVPList attrs;
00245
00246 bool want_all_children =
00247 saver.begin_object(0, "channel_factory", attrs, changed);
00248
00249
00250
00251
00252 TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00253
00254 this->ec_container().collection()->for_each(&wrk);
00255
00256 if (want_all_children || this->reconnect_registry_.is_changed ())
00257 {
00258 this->reconnect_registry_.save_persistent(saver);
00259 }
00260 saver.end_object(0, "channel_factory");
00261 }
00262
00263 void
00264 TAO_Notify_EventChannelFactory::load_event_persistence (void)
00265 {
00266 TAO_Notify::Event_Persistence_Strategy * strategy =
00267 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00268 if (strategy != 0)
00269 {
00270 if (this->topology_factory_ != 0)
00271 {
00272 TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00273 if (factory != 0)
00274 {
00275 for (
00276 TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00277 rspm != 0;
00278 rspm = rspm->load_next ())
00279 {
00280 TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00281 if (!routing_slip.null ())
00282 {
00283 this->routing_slip_restart_set_.insert (routing_slip);
00284 }
00285 else
00286 {
00287
00288
00289 ACE_DEBUG ((LM_DEBUG,
00290 ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00291 ));
00292 }
00293 }
00294 }
00295 }
00296 else
00297 {
00298 ACE_ERROR ((LM_ERROR,
00299 ACE_TEXT ("(%P|%t) Notify Service: Configuration error. Event Persistence requires Topology Persistence.\n")
00300 ));
00301 throw CORBA::PERSIST_STORE();
00302 }
00303 }
00304 }
00305
00306 bool
00307 TAO_Notify_EventChannelFactory::change_to_parent (void)
00308 {
00309 bool saving = false;
00310 if (! this->loading_topology_)
00311 {
00312
00313 if (this->topology_factory_ != 0)
00314 {
00315 saving = true;
00316
00317
00318
00319
00320 short seq = this->topology_save_seq_;
00321 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00322 if (seq == this->topology_save_seq_)
00323 {
00324 auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00325 if (saver.get() != 0)
00326 {
00327 this->save_persistent(*saver);
00328 saver->close ();
00329 }
00330 this->topology_save_seq_ += 1;
00331 }
00332 }
00333 }
00334 return saving;
00335 }
00336
00337 TAO_Notify::Topology_Object*
00338 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00339 CORBA::Long id,
00340 const TAO_Notify::
00341 NVPList& attrs)
00342 {
00343
00344 TAO_Notify::Topology_Object * result = this;
00345 if (type == "channel")
00346 {
00347 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00348 ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00349 , static_cast<int> (id)
00350 ));
00351
00352 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00353 TAO_Notify_EventChannel * ec = bld->build_event_channel(
00354 this ,
00355 id);
00356
00357 ec->load_attrs (attrs);
00358
00359 result = ec;
00360 }
00361 else if (type == TAO_Notify::REGISTRY_TYPE)
00362 {
00363 result = & this->reconnect_registry_;
00364 }
00365 return result;
00366 }
00367
00368 void
00369 TAO_Notify_EventChannelFactory::reconnect (void)
00370 {
00371
00372 TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00373
00374 this->ec_container().collection()->for_each(&wrk);
00375
00376
00377 ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00378 this->reconnect_registry_.send_reconnect (this->channel_factory_.in ());
00379
00380
00381 Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00382 TAO_Notify::Routing_Slip_Ptr * routing_slip;
00383 for (iter.first(); iter.next(routing_slip); iter.advance())
00384 {
00385 (*routing_slip)->reconnect();
00386 }
00387 this->routing_slip_restart_set_.reset ();
00388 }
00389
00390 NotifyExt::ReconnectionRegistry::ReconnectionID
00391 TAO_Notify_EventChannelFactory::register_callback (
00392 NotifyExt::ReconnectionCallback_ptr reconnection)
00393 {
00394 return this->reconnect_registry_.register_callback (
00395 reconnection);
00396 }
00397
00398 void
00399 TAO_Notify_EventChannelFactory::unregister_callback (
00400 NotifyExt::ReconnectionRegistry::ReconnectionID id)
00401 {
00402 this->reconnect_registry_.unregister_callback (
00403 id);
00404 }
00405
00406 CORBA::Boolean
00407 TAO_Notify_EventChannelFactory::is_alive (void)
00408 {
00409 return CORBA::Boolean (1);
00410 }
00411
00412 void
00413 TAO_Notify_EventChannelFactory::save_topology (void)
00414 {
00415 this->self_change ();
00416 }
00417
00418 TAO_Notify_ProxyConsumer *
00419 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00420 {
00421 TAO_Notify_ProxyConsumer * result = 0;
00422 size_t path_size = id_path.size ();
00423
00424
00425
00426 if (position < path_size && id_path[position] == this->id())
00427 {
00428 ++position;
00429 }
00430 if (position < path_size)
00431 {
00432 TAO_Notify_EventChannel_Find_Worker find_worker;
00433
00434 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00435 ++position;
00436 if (ec != 0)
00437 {
00438 result = ec->find_proxy_consumer (id_path, position);
00439 }
00440 }
00441 return result;
00442 }
00443
00444 TAO_Notify_ProxySupplier *
00445 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00446 {
00447 TAO_Notify_ProxySupplier * result = 0;
00448 size_t path_size = id_path.size ();
00449
00450
00451
00452 if (position < path_size && id_path[position] == this->id())
00453 {
00454 ++position;
00455 }
00456 if (position < path_size)
00457 {
00458 TAO_Notify_EventChannel_Find_Worker find_worker;
00459 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00460 ++position;
00461 if (ec != 0)
00462 {
00463 result = ec->find_proxy_supplier (id_path, position);
00464 }
00465 }
00466 return result;
00467 }
00468
00469 CosNotifyChannelAdmin::EventChannelFactory_ptr
00470 TAO_Notify_EventChannelFactory::activate_self (void)
00471 {
00472 CORBA::Object_var obj = this->activate (this);
00473 this->channel_factory_
00474 = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in());
00475
00476 try
00477 {
00478 if (DEBUG_LEVEL > 9)
00479 {
00480 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00481 }
00482 this->reconnect ();
00483 }
00484 catch (const CORBA::Exception&)
00485 {
00486
00487 }
00488 return this->channel_factory_._retn();
00489 }
00490
00491
00492 TAO_Notify_Object::ID
00493 TAO_Notify_EventChannelFactory::get_id () const
00494 {
00495 return id();
00496 }
00497
00498 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00499 TAO_Notify_EventChannelFactory::ec_container()
00500 {
00501 ACE_ASSERT( this->ec_container_.get() != 0 );
00502 return *ec_container_;
00503 }
00504
00505 TAO_END_VERSIONED_NAMESPACE_DECL