00001
00002
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004
00005 ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "EventChannelFactory.cpp,v 1.18 2006/03/14 06:14:34 jtc Exp")
00006
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Factory.h"
00009 #include "orbsvcs/Notify/Builder.h"
00010 #include "orbsvcs/Notify/Topology_Saver.h"
00011 #include "orbsvcs/Notify/Topology_Loader.h"
00012 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00013 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00014 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00015 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00016 #include "orbsvcs/Notify/EventChannel.h"
00017 #include "orbsvcs/Notify/Container_T.h"
00018 #include "orbsvcs/Notify/Find_Worker_T.h"
00019 #include "orbsvcs/Notify/Seq_Worker_T.h"
00020 #include "orbsvcs/Notify/POA_Helper.h"
00021
00022 #include "ace/Dynamic_Service.h"
00023
00024 #include "tao/debug.h"
00025
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032
00033
00034 namespace TAO_Notify
00035 {
00036
00037 Topology_Factory::~Topology_Factory ()
00038 {
00039 }
00040 }
00041
00042 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00043 , CosNotifyChannelAdmin::EventChannel
00044 , CosNotifyChannelAdmin::EventChannel_ptr
00045 , CosNotifyChannelAdmin::ChannelNotFound>
00046 TAO_Notify_EventChannel_Find_Worker;
00047
00048 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00049
00050 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00051 : topology_save_seq_ (0)
00052 , topology_factory_(0)
00053 , reconnect_registry_(*this)
00054 , loading_topology_ (false)
00055 {
00056 }
00057
00058 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00059 {
00060 }
00061
00062 void
00063 TAO_Notify_EventChannelFactory::destroy (ACE_ENV_SINGLE_ARG_DECL)
00064 ACE_THROW_SPEC ((
00065 CORBA::SystemException
00066 ))
00067 {
00068 int result = this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00069 ACE_CHECK;
00070 if ( result == 1)
00071 return;
00072
00073 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00074
00075
00076 properties->orb (CORBA::ORB::_nil ());
00077 properties->default_poa (PortableServer::POA::_nil ());
00078
00079 ec_container_.reset( 0 );
00080 }
00081
00082 void
00083 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa ACE_ENV_ARG_DECL)
00084 {
00085 ACE_ASSERT (this->ec_container_.get() == 0);
00086
00087 this->default_filter_factory_ =
00088 TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
00089 ACE_CHECK;
00090
00091
00092 TAO_Notify_EventChannel_Container* ecc = 0;
00093 ACE_NEW_THROW_EX (ecc,
00094 TAO_Notify_EventChannel_Container (),
00095 CORBA::INTERNAL ());
00096 ACE_CHECK;
00097 this->ec_container_.reset( ecc );
00098
00099 this->ec_container().init (ACE_ENV_SINGLE_ARG_PARAMETER);
00100 ACE_CHECK;
00101
00102 TAO_Notify_POA_Helper* object_poa = 0;
00103
00104
00105 ACE_NEW_THROW_EX (object_poa,
00106 TAO_Notify_POA_Helper (),
00107 CORBA::NO_MEMORY ());
00108 ACE_CHECK;
00109
00110 ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00111
00112 object_poa->init (poa ACE_ENV_ARG_PARAMETER);
00113 ACE_CHECK;
00114
00115 this->adopt_poa (auto_object_poa.release ());
00116
00117
00118
00119 this->topology_factory_ =
00120 ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00121
00122 this->load_topology (ACE_ENV_SINGLE_ARG_PARAMETER);
00123 ACE_CHECK;
00124
00125 this->load_event_persistence (ACE_ENV_SINGLE_ARG_PARAMETER);
00126 ACE_CHECK;
00127 }
00128
00129 void
00130 TAO_Notify_EventChannelFactory::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00131 {
00132 this->_incr_refcnt ();
00133 }
00134
00135 void
00136 TAO_Notify_EventChannelFactory::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00137 {
00138 this->_decr_refcnt ();
00139 }
00140
00141 void
00142 TAO_Notify_EventChannelFactory::release (void)
00143 {
00144 delete this;
00145
00146 }
00147
00148 void
00149 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel ACE_ENV_ARG_DECL)
00150 {
00151 this->ec_container().remove (event_channel ACE_ENV_ARG_PARAMETER);
00152 ACE_CHECK;
00153 this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00154 }
00155
00156 int
00157 TAO_Notify_EventChannelFactory::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00158 {
00159 int sd_ret = TAO_Notify_Object::shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00160 ACE_CHECK_RETURN (1);
00161
00162 if (sd_ret == 1)
00163 return 1;
00164
00165 this->ec_container().shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00166 ACE_CHECK_RETURN (1);
00167
00168 return 0;
00169 }
00170
00171 CosNotifyFilter::FilterFactory_ptr
00172 TAO_Notify_EventChannelFactory::get_default_filter_factory (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00173 {
00174 return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00175 }
00176
00177 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00178 const CosNotification::QoSProperties & initial_qos,
00179 const CosNotification::AdminProperties & initial_admin,
00180 CosNotifyChannelAdmin::ChannelID_out id ACE_ENV_ARG_DECL
00181 )
00182 ACE_THROW_SPEC ((
00183 CORBA::SystemException
00184 , CosNotification::UnsupportedQoS
00185 , CosNotification::UnsupportedAdmin
00186 ))
00187 {
00188 CosNotifyChannelAdmin::EventChannel_var ec =
00189 TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00190 , initial_qos
00191 , initial_admin
00192 , id
00193 ACE_ENV_ARG_PARAMETER);
00194 ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil());
00195 this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00196 ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil());
00197 return ec._retn ();
00198 }
00199
00200 CosNotifyChannelAdmin::ChannelIDSeq*
00201 TAO_Notify_EventChannelFactory::get_all_channels (ACE_ENV_SINGLE_ARG_DECL)
00202 ACE_THROW_SPEC ((
00203 CORBA::SystemException
00204 ))
00205 {
00206 TAO_Notify_EventChannel_Seq_Worker seq_worker;
00207
00208 return seq_worker.create (this->ec_container() ACE_ENV_ARG_PARAMETER);
00209 }
00210
00211 CosNotifyChannelAdmin::EventChannel_ptr
00212 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id ACE_ENV_ARG_DECL)
00213 ACE_THROW_SPEC ((
00214 CORBA::SystemException
00215 , CosNotifyChannelAdmin::ChannelNotFound
00216 ))
00217 {
00218 TAO_Notify_EventChannel_Find_Worker find_worker;
00219
00220 return find_worker.resolve (id, this->ec_container() ACE_ENV_ARG_PARAMETER);
00221 }
00222
00223 void
00224 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00225 {
00226 ACE_DEBUG ((LM_DEBUG,
00227 ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00228 ));
00229
00230
00231
00232 this->topology_factory_ = f;
00233 }
00234
00235 void
00236 TAO_Notify_EventChannelFactory::load_topology (ACE_ENV_SINGLE_ARG_DECL)
00237 {
00238 this->loading_topology_ = true;
00239 if (this->topology_factory_ != 0)
00240 {
00241
00242 auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00243 if (tl.get () != 0)
00244 {
00245 tl->load (this ACE_ENV_ARG_PARAMETER);
00246 ACE_CHECK;
00247 }
00248 }
00249 else
00250 {
00251 if (TAO_debug_level > 0)
00252 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00253 }
00254 this->loading_topology_ = false;
00255 }
00256 bool
00257 TAO_Notify_EventChannelFactory::is_persistent () const
00258 {
00259 return true;
00260 }
00261
00262 void
00263 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver ACE_ENV_ARG_DECL)
00264 {
00265 bool changed = this->self_changed_;
00266 this->self_changed_ = false;
00267 this->children_changed_ = false;
00268
00269 TAO_Notify::NVPList attrs;
00270
00271 bool want_all_children =
00272 saver.begin_object(0, "channel_factory", attrs, changed ACE_ENV_ARG_PARAMETER);
00273 ACE_CHECK;
00274
00275
00276
00277
00278 TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00279
00280 this->ec_container().collection()->for_each(&wrk ACE_ENV_ARG_PARAMETER);
00281 ACE_CHECK;
00282
00283 if (want_all_children || this->reconnect_registry_.is_changed ())
00284 {
00285 this->reconnect_registry_.save_persistent(saver ACE_ENV_ARG_PARAMETER);
00286 ACE_CHECK;
00287 }
00288 saver.end_object(0, "channel_factory" ACE_ENV_ARG_PARAMETER);
00289 }
00290
00291 void
00292 TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL)
00293 {
00294 TAO_Notify::Event_Persistence_Strategy * strategy =
00295 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00296 if (strategy != 0)
00297 {
00298 if (this->topology_factory_ != 0)
00299 {
00300 TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00301 if (factory != 0)
00302 {
00303 for (
00304 TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00305 rspm != 0;
00306 rspm = rspm->load_next ())
00307 {
00308 TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00309 if (!routing_slip.null ())
00310 {
00311 this->routing_slip_restart_set_.insert (routing_slip);
00312 }
00313 else
00314 {
00315
00316
00317 ACE_DEBUG ((LM_DEBUG,
00318 ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00319 ));
00320 }
00321 }
00322 }
00323 }
00324 else
00325 {
00326 ACE_ERROR ((LM_ERROR,
00327 ACE_TEXT ("(%P|%t) Notify Service: Configuration error. Event Persistence requires Topology Persistence.\n")
00328 ));
00329 ACE_THROW (CORBA::PERSIST_STORE());
00330 ACE_CHECK;
00331 }
00332 }
00333 }
00334
00335 bool
00336 TAO_Notify_EventChannelFactory::change_to_parent (ACE_ENV_SINGLE_ARG_DECL)
00337 {
00338 bool saving = false;
00339 if (! this->loading_topology_)
00340 {
00341
00342 if (this->topology_factory_ != 0)
00343 {
00344 saving = true;
00345
00346
00347
00348
00349 short seq = this->topology_save_seq_;
00350 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00351 ACE_CHECK_RETURN(false);
00352 if (seq == this->topology_save_seq_)
00353 {
00354 auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00355 if (saver.get() != 0)
00356 {
00357 this->save_persistent(*saver ACE_ENV_ARG_PARAMETER);
00358 ACE_CHECK_RETURN(false);
00359 saver->close (ACE_ENV_SINGLE_ARG_PARAMETER);
00360 ACE_CHECK_RETURN (false);
00361 }
00362 this->topology_save_seq_ += 1;
00363 }
00364 }
00365 }
00366 return saving;
00367 }
00368
00369 TAO_Notify::Topology_Object*
00370 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00371 CORBA::Long id,
00372 const TAO_Notify::
00373 NVPList& attrs
00374 ACE_ENV_ARG_DECL)
00375 {
00376
00377 TAO_Notify::Topology_Object * result = this;
00378 if (type == "channel")
00379 {
00380 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00381 ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00382 , static_cast<int> (id)
00383 ));
00384
00385 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00386 TAO_Notify_EventChannel * ec = bld->build_event_channel(
00387 this ,
00388 id
00389 ACE_ENV_ARG_PARAMETER);
00390 ACE_CHECK_RETURN(0);
00391
00392 ec->load_attrs (attrs);
00393
00394 result = ec;
00395 }
00396 else if (type == TAO_Notify::REGISTRY_TYPE)
00397 {
00398 result = & this->reconnect_registry_;
00399 }
00400 return result;
00401 }
00402
00403 void
00404 TAO_Notify_EventChannelFactory::reconnect (ACE_ENV_SINGLE_ARG_DECL)
00405 {
00406
00407 TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00408
00409 this->ec_container().collection()->for_each(&wrk ACE_ENV_ARG_PARAMETER);
00410 ACE_CHECK;
00411
00412
00413 ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00414 this->reconnect_registry_.send_reconnect (this->channel_factory_.in () ACE_ENV_ARG_PARAMETER);
00415 ACE_CHECK;
00416
00417
00418 Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00419 TAO_Notify::Routing_Slip_Ptr * routing_slip;
00420 for (iter.first(); iter.next(routing_slip); iter.advance())
00421 {
00422 (*routing_slip)->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00423 ACE_CHECK;
00424 }
00425 this->routing_slip_restart_set_.reset ();
00426 }
00427
00428 NotifyExt::ReconnectionRegistry::ReconnectionID
00429 TAO_Notify_EventChannelFactory::register_callback (
00430 NotifyExt::ReconnectionCallback_ptr reconnection
00431 ACE_ENV_ARG_DECL)
00432 ACE_THROW_SPEC ((CORBA::SystemException))
00433 {
00434 return this->reconnect_registry_.register_callback (
00435 reconnection
00436 ACE_ENV_ARG_PARAMETER);
00437 }
00438
00439 void
00440 TAO_Notify_EventChannelFactory::unregister_callback (
00441 NotifyExt::ReconnectionRegistry::ReconnectionID id
00442 ACE_ENV_ARG_DECL)
00443 ACE_THROW_SPEC ((CORBA::SystemException))
00444 {
00445 this->reconnect_registry_.unregister_callback (
00446 id
00447 ACE_ENV_ARG_PARAMETER);
00448 }
00449
00450 CORBA::Boolean
00451 TAO_Notify_EventChannelFactory::is_alive (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00452 ACE_THROW_SPEC ((CORBA::SystemException))
00453 {
00454 return CORBA::Boolean (1);
00455 }
00456
00457 void
00458 TAO_Notify_EventChannelFactory::save_topology (ACE_ENV_SINGLE_ARG_DECL)
00459 ACE_THROW_SPEC ((CORBA::SystemException))
00460 {
00461 this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00462 ACE_CHECK;
00463 }
00464
00465 TAO_Notify_ProxyConsumer *
00466 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL)
00467 {
00468 TAO_Notify_ProxyConsumer * result = 0;
00469 size_t path_size = id_path.size ();
00470
00471
00472
00473 if (position < path_size && id_path[position] == this->id())
00474 {
00475 ++position;
00476 }
00477 if (position < path_size)
00478 {
00479 TAO_Notify_EventChannel_Find_Worker find_worker;
00480
00481 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container() ACE_ENV_ARG_PARAMETER);
00482 ACE_CHECK_RETURN (0);
00483 ++position;
00484 if (ec != 0)
00485 {
00486 result = ec->find_proxy_consumer (id_path, position
00487 ACE_ENV_ARG_PARAMETER);
00488 ACE_CHECK_RETURN(0);
00489 }
00490 }
00491 return result;
00492 }
00493
00494 TAO_Notify_ProxySupplier *
00495 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL)
00496 {
00497 TAO_Notify_ProxySupplier * result = 0;
00498 size_t path_size = id_path.size ();
00499
00500
00501
00502 if (position < path_size && id_path[position] == this->id())
00503 {
00504 ++position;
00505 }
00506 if (position < path_size)
00507 {
00508 TAO_Notify_EventChannel_Find_Worker find_worker;
00509 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container() ACE_ENV_ARG_PARAMETER);
00510 ACE_CHECK_RETURN (0);
00511 ++position;
00512 if (ec != 0)
00513 {
00514 result = ec->find_proxy_supplier (id_path, position
00515 ACE_ENV_ARG_PARAMETER);
00516 ACE_CHECK_RETURN(0);
00517 }
00518 }
00519 return result;
00520 }
00521
00522 CosNotifyChannelAdmin::EventChannelFactory_ptr
00523 TAO_Notify_EventChannelFactory::activate_self (ACE_ENV_SINGLE_ARG_DECL)
00524 {
00525 CORBA::Object_var obj = this->activate (this ACE_ENV_ARG_PARAMETER);
00526 ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00527 this->channel_factory_
00528 = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
00529 ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00530 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
00531 ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00532
00533 ACE_TRY_NEW_ENV
00534 {
00535 if (DEBUG_LEVEL > 9)
00536 {
00537 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00538 }
00539 this->reconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
00540 ACE_TRY_CHECK;
00541 }
00542 ACE_CATCHANY
00543 {
00544
00545 }
00546 ACE_ENDTRY;
00547 return this->channel_factory_._retn();
00548 }
00549
00550
00551 TAO_Notify_Object::ID
00552 TAO_Notify_EventChannelFactory::get_id () const
00553 {
00554 return id();
00555 }
00556
00557 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00558 TAO_Notify_EventChannelFactory::ec_container()
00559 {
00560 ACE_ASSERT( this->ec_container_.get() != 0 );
00561 return *ec_container_;
00562 }
00563
00564 TAO_END_VERSIONED_NAMESPACE_DECL