00001
00002
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004
00005 ACE_RCSID(Notify,
00006 TAO_Notify_EventChannelFactory,
00007 "$Id: EventChannelFactory.cpp 85507 2009-06-04 11:15:22Z johnnyw $")
00008
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Topology_Saver.h"
00013 #include "orbsvcs/Notify/Topology_Loader.h"
00014 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00015 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00016 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00017 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00018 #include "orbsvcs/Notify/EventChannel.h"
00019 #include "orbsvcs/Notify/Container_T.h"
00020 #include "orbsvcs/Notify/Find_Worker_T.h"
00021 #include "orbsvcs/Notify/Seq_Worker_T.h"
00022 #include "orbsvcs/Notify/POA_Helper.h"
00023 #include "orbsvcs/Notify/Validate_Worker_T.h"
00024 #include "orbsvcs/Notify/Validate_Client_Task.h"
00025 #include "orbsvcs/Notify/FilterFactory.h"
00026
00027 #include "ace/Dynamic_Service.h"
00028
00029 #include "tao/debug.h"
00030
00031 #ifndef DEBUG_LEVEL
00032 # define DEBUG_LEVEL TAO_debug_level
00033 #endif //DEBUG_LEVEL
00034
00035 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00036
00037
00038
00039 namespace TAO_Notify
00040 {
00041
00042 Topology_Factory::~Topology_Factory ()
00043 {
00044 }
00045 }
00046
00047 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00048 , CosNotifyChannelAdmin::EventChannel
00049 , CosNotifyChannelAdmin::EventChannel_ptr
00050 , CosNotifyChannelAdmin::ChannelNotFound>
00051 TAO_Notify_EventChannel_Find_Worker;
00052
00053 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00054
00055 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00056 : topology_save_seq_ (0)
00057 , topology_factory_(0)
00058 , reconnect_registry_(*this)
00059 , loading_topology_ (false)
00060 {
00061 }
00062
00063 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00064 {
00065 }
00066
00067 void
00068 TAO_Notify_EventChannelFactory::destroy (void)
00069 {
00070 if (this->shutdown () == 1)
00071 return;
00072
00073 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00074
00075
00076 properties->orb (CORBA::ORB::_nil ());
00077 properties->default_poa (PortableServer::POA::_nil ());
00078
00079 ec_container_.reset( 0 );
00080 }
00081
00082 void
00083 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa)
00084 {
00085 this->poa_ = PortableServer::POA::_duplicate (poa);
00086
00087 ACE_ASSERT (this->ec_container_.get() == 0);
00088
00089
00090 TAO_Notify_EventChannel_Container* ecc = 0;
00091 ACE_NEW_THROW_EX (ecc,
00092 TAO_Notify_EventChannel_Container (),
00093 CORBA::INTERNAL ());
00094 this->ec_container_.reset( ecc );
00095
00096 this->ec_container().init ();
00097
00098 TAO_Notify_POA_Helper* object_poa = 0;
00099
00100
00101 ACE_NEW_THROW_EX (object_poa,
00102 TAO_Notify_POA_Helper (),
00103 CORBA::NO_MEMORY ());
00104
00105 ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00106
00107 ACE_CString poa_name = object_poa->get_unique_id ();
00108 #if defined (CORBA_E_MICRO)
00109 object_poa->init (poa, poa_name.c_str ());
00110 #else
00111 object_poa->init_persistent (poa, poa_name.c_str ());
00112 #endif
00113
00114 this->adopt_poa (auto_object_poa.release ());
00115
00116
00117
00118 this->topology_factory_ =
00119 ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00120
00121 this->load_topology ();
00122
00123 this->load_event_persistence ();
00124
00125 if (TAO_Notify_PROPERTIES::instance()->validate_client() == true)
00126 {
00127 TAO_Notify_validate_client_Task* validate_client_task = 0;
00128 ACE_NEW_THROW_EX (validate_client_task,
00129 TAO_Notify_validate_client_Task (TAO_Notify_PROPERTIES::instance()->validate_client_delay (),
00130 TAO_Notify_PROPERTIES::instance()->validate_client_interval (),
00131 this),
00132 CORBA::INTERNAL ());
00133 this->validate_client_task_.reset (validate_client_task);
00134 }
00135 }
00136
00137 void
00138 TAO_Notify_EventChannelFactory::_add_ref (void)
00139 {
00140 this->_incr_refcnt ();
00141 }
00142
00143 void
00144 TAO_Notify_EventChannelFactory::_remove_ref (void)
00145 {
00146 this->_decr_refcnt ();
00147 }
00148
00149 void
00150 TAO_Notify_EventChannelFactory::release (void)
00151 {
00152 delete this;
00153
00154 }
00155
00156 void
00157 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel)
00158 {
00159 this->ec_container().remove (event_channel);
00160 this->self_change ();
00161 }
00162
00163 int
00164 TAO_Notify_EventChannelFactory::shutdown (void)
00165 {
00166 this->stop_validator();
00167
00168 if (TAO_Notify_Object::shutdown () == 1)
00169 return 1;
00170
00171 this->ec_container().shutdown ();
00172
00173 return 0;
00174 }
00175
00176
00177 CosNotifyChannelAdmin::EventChannel_ptr
00178 TAO_Notify_EventChannelFactory::create_named_channel (
00179 const CosNotification::QoSProperties& initial_qos,
00180 const CosNotification::AdminProperties& initial_admin,
00181 CosNotifyChannelAdmin::ChannelID_out id,
00182 const char*)
00183 {
00184 return this->create_channel (initial_qos, initial_admin, id);
00185 }
00186
00187
00188 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00189 const CosNotification::QoSProperties & initial_qos,
00190 const CosNotification::AdminProperties & initial_admin,
00191 CosNotifyChannelAdmin::ChannelID_out id
00192 )
00193 {
00194 CosNotifyChannelAdmin::EventChannel_var ec =
00195 TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00196 , initial_qos
00197 , initial_admin
00198 , id);
00199 this->self_change ();
00200 return ec._retn ();
00201 }
00202
00203 CosNotifyChannelAdmin::ChannelIDSeq*
00204 TAO_Notify_EventChannelFactory::get_all_channels (void)
00205 {
00206 TAO_Notify_EventChannel_Seq_Worker seq_worker;
00207
00208 return seq_worker.create (this->ec_container());
00209 }
00210
00211 CosNotifyChannelAdmin::EventChannel_ptr
00212 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id)
00213 {
00214 TAO_Notify_EventChannel_Find_Worker find_worker;
00215
00216 return find_worker.resolve (id, this->ec_container());
00217 }
00218
00219 void
00220 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00221 {
00222 ACE_DEBUG ((LM_DEBUG,
00223 ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00224 ));
00225
00226
00227
00228 this->topology_factory_ = f;
00229 }
00230
00231 void
00232 TAO_Notify_EventChannelFactory::load_topology (void)
00233 {
00234 this->loading_topology_ = true;
00235 if (this->topology_factory_ != 0)
00236 {
00237
00238 auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00239 if (tl.get () != 0)
00240 {
00241 tl->load (this);
00242 }
00243 }
00244 else
00245 {
00246 if (TAO_debug_level > 0)
00247 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00248 }
00249 this->loading_topology_ = false;
00250 }
00251
00252 void
00253 TAO_Notify_EventChannelFactory::validate ()
00254 {
00255 TAO_Notify::Validate_Worker<TAO_Notify_EventChannel> wrk;
00256
00257 this->ec_container().collection()->for_each(&wrk);
00258 }
00259
00260 void
00261 TAO_Notify_EventChannelFactory::stop_validator ()
00262 {
00263 if (this->validate_client_task_.get () != 0)
00264 {
00265 this->validate_client_task_->shutdown ();
00266 }
00267 }
00268
00269 bool
00270 TAO_Notify_EventChannelFactory::is_persistent () const
00271 {
00272 return true;
00273 }
00274
00275 void
00276 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver)
00277 {
00278 bool changed = this->self_changed_;
00279 this->self_changed_ = false;
00280 this->children_changed_ = false;
00281
00282 TAO_Notify::NVPList attrs;
00283
00284 bool want_all_children =
00285 saver.begin_object(0, "channel_factory", attrs, changed);
00286
00287
00288
00289
00290 TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00291
00292 this->ec_container().collection()->for_each(&wrk);
00293
00294 if (want_all_children || this->reconnect_registry_.is_changed ())
00295 {
00296 this->reconnect_registry_.save_persistent(saver);
00297 }
00298 saver.end_object(0, "channel_factory");
00299 }
00300
00301 void
00302 TAO_Notify_EventChannelFactory::load_event_persistence (void)
00303 {
00304 TAO_Notify::Event_Persistence_Strategy * strategy =
00305 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00306 if (strategy != 0)
00307 {
00308 if (this->topology_factory_ != 0)
00309 {
00310 TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00311 if (factory != 0)
00312 {
00313 for (
00314 TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00315 rspm != 0;
00316 rspm = rspm->load_next ())
00317 {
00318 TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00319 if (!routing_slip.null ())
00320 {
00321 this->routing_slip_restart_set_.insert (routing_slip);
00322 }
00323 else
00324 {
00325
00326
00327 ACE_DEBUG ((LM_DEBUG,
00328 ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00329 ));
00330 }
00331 }
00332 }
00333 }
00334 else
00335 {
00336 ACE_ERROR ((LM_ERROR,
00337 ACE_TEXT ("(%P|%t) Notify Service: Configuration error. Event Persistence requires Topology Persistence.\n")
00338 ));
00339 throw CORBA::PERSIST_STORE();
00340 }
00341 }
00342 }
00343
00344 bool
00345 TAO_Notify_EventChannelFactory::change_to_parent (void)
00346 {
00347 bool saving = false;
00348 if (! this->loading_topology_)
00349 {
00350
00351 if (this->topology_factory_ != 0)
00352 {
00353 saving = true;
00354
00355
00356
00357
00358 short seq = this->topology_save_seq_;
00359 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00360 if (seq == this->topology_save_seq_)
00361 {
00362 auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00363 if (saver.get() != 0)
00364 {
00365 this->save_persistent(*saver);
00366 saver->close ();
00367 }
00368 this->topology_save_seq_ += 1;
00369 }
00370 }
00371 }
00372 return saving;
00373 }
00374
00375 TAO_Notify::Topology_Object*
00376 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00377 CORBA::Long id,
00378 const TAO_Notify::
00379 NVPList& attrs)
00380 {
00381
00382 TAO_Notify::Topology_Object * result = this;
00383 if (type == "channel")
00384 {
00385 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00386 ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00387 , static_cast<int> (id)
00388 ));
00389
00390 TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00391 TAO_Notify_EventChannel * ec = bld->build_event_channel(
00392 this ,
00393 id);
00394
00395 ec->load_attrs (attrs);
00396
00397 result = ec;
00398 }
00399 else if (type == TAO_Notify::REGISTRY_TYPE)
00400 {
00401 result = & this->reconnect_registry_;
00402 }
00403 return result;
00404 }
00405
00406 void
00407 TAO_Notify_EventChannelFactory::reconnect (void)
00408 {
00409
00410 TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00411
00412 this->ec_container().collection()->for_each(&wrk);
00413
00414
00415 ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00416 this->reconnect_registry_.send_reconnect (this->channel_factory_.in ());
00417
00418
00419 Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00420 TAO_Notify::Routing_Slip_Ptr * routing_slip;
00421 for (iter.first(); iter.next(routing_slip); iter.advance())
00422 {
00423 (*routing_slip)->reconnect();
00424 }
00425 this->routing_slip_restart_set_.reset ();
00426 }
00427
00428 NotifyExt::ReconnectionRegistry::ReconnectionID
00429 TAO_Notify_EventChannelFactory::register_callback (
00430 NotifyExt::ReconnectionCallback_ptr reconnection)
00431 {
00432 return this->reconnect_registry_.register_callback (
00433 reconnection);
00434 }
00435
00436 void
00437 TAO_Notify_EventChannelFactory::unregister_callback (
00438 NotifyExt::ReconnectionRegistry::ReconnectionID id)
00439 {
00440 this->reconnect_registry_.unregister_callback (
00441 id);
00442 }
00443
00444 CORBA::Boolean
00445 TAO_Notify_EventChannelFactory::is_alive (void)
00446 {
00447 return CORBA::Boolean (1);
00448 }
00449
00450 void
00451 TAO_Notify_EventChannelFactory::save_topology (void)
00452 {
00453 this->self_change ();
00454 }
00455
00456 TAO_Notify_ProxyConsumer *
00457 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00458 {
00459 TAO_Notify_ProxyConsumer * result = 0;
00460 size_t path_size = id_path.size ();
00461
00462
00463
00464 if (position < path_size && id_path[position] == this->id())
00465 {
00466 ++position;
00467 }
00468 if (position < path_size)
00469 {
00470 TAO_Notify_EventChannel_Find_Worker find_worker;
00471
00472 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00473 ++position;
00474 if (ec != 0)
00475 {
00476 result = ec->find_proxy_consumer (id_path, position);
00477 }
00478 }
00479 return result;
00480 }
00481
00482 TAO_Notify_ProxySupplier *
00483 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00484 {
00485 TAO_Notify_ProxySupplier * result = 0;
00486 size_t path_size = id_path.size ();
00487
00488
00489
00490 if (position < path_size && id_path[position] == this->id())
00491 {
00492 ++position;
00493 }
00494 if (position < path_size)
00495 {
00496 TAO_Notify_EventChannel_Find_Worker find_worker;
00497 TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00498 ++position;
00499 if (ec != 0)
00500 {
00501 result = ec->find_proxy_supplier (id_path, position);
00502 }
00503 }
00504 return result;
00505 }
00506
00507 CosNotifyChannelAdmin::EventChannelFactory_ptr
00508 TAO_Notify_EventChannelFactory::activate_self (void)
00509 {
00510 CORBA::Object_var obj = this->activate (this);
00511 this->channel_factory_
00512 = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in());
00513
00514 try
00515 {
00516 if (DEBUG_LEVEL > 9)
00517 {
00518 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self\n") ));
00519 }
00520 this->reconnect ();
00521 }
00522 catch (const CORBA::Exception&)
00523 {
00524
00525 }
00526 return this->channel_factory_._retn();
00527 }
00528
00529
00530 TAO_Notify_Object::ID
00531 TAO_Notify_EventChannelFactory::get_id () const
00532 {
00533 return id();
00534 }
00535
00536 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00537 TAO_Notify_EventChannelFactory::ec_container()
00538 {
00539 ACE_ASSERT( this->ec_container_.get() != 0 );
00540 return *ec_container_;
00541 }
00542
00543
00544 TAO_END_VERSIONED_NAMESPACE_DECL