EventChannelFactory.cpp

Go to the documentation of this file.
00001 // EventChannelFactory.cpp,v 1.18 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004 
00005 ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "EventChannelFactory.cpp,v 1.18 2006/03/14 06:14:34 jtc Exp")
00006 
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Factory.h"
00009 #include "orbsvcs/Notify/Builder.h"
00010 #include "orbsvcs/Notify/Topology_Saver.h"
00011 #include "orbsvcs/Notify/Topology_Loader.h"
00012 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00013 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00014 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00015 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00016 #include "orbsvcs/Notify/EventChannel.h"
00017 #include "orbsvcs/Notify/Container_T.h"
00018 #include "orbsvcs/Notify/Find_Worker_T.h"
00019 #include "orbsvcs/Notify/Seq_Worker_T.h"
00020 #include "orbsvcs/Notify/POA_Helper.h"
00021 
00022 #include "ace/Dynamic_Service.h"
00023 
00024 #include "tao/debug.h"
00025 //#define DEBUG_LEVEL 9
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029 
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031 
00032 // Include this here since this is the only file that
00033 // requires Topology_Factory.
00034 namespace TAO_Notify
00035 {
00036   // virtual
00037   Topology_Factory::~Topology_Factory ()
00038   {
00039   }
00040 }
00041 
00042 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00043                              , CosNotifyChannelAdmin::EventChannel
00044                              , CosNotifyChannelAdmin::EventChannel_ptr
00045                              , CosNotifyChannelAdmin::ChannelNotFound>
00046 TAO_Notify_EventChannel_Find_Worker;
00047 
00048 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00049 
00050 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00051   : topology_save_seq_ (0)
00052   , topology_factory_(0)
00053   , reconnect_registry_(*this)
00054   , loading_topology_ (false)
00055 {
00056 }
00057 
00058 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00059 {
00060 }
00061 
00062 void
00063 TAO_Notify_EventChannelFactory::destroy (ACE_ENV_SINGLE_ARG_DECL)
00064   ACE_THROW_SPEC ((
00065                    CORBA::SystemException
00066                    ))
00067 {
00068   int result = this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00069   ACE_CHECK;
00070   if ( result == 1)
00071     return;
00072 
00073   TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00074 
00075   // Reset references to CORBA objects.
00076   properties->orb (CORBA::ORB::_nil ());
00077   properties->default_poa (PortableServer::POA::_nil ());
00078 
00079   ec_container_.reset( 0 );
00080 }
00081 
00082 void
00083 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa ACE_ENV_ARG_DECL)
00084 {
00085   ACE_ASSERT (this->ec_container_.get() == 0);
00086 
00087   this->default_filter_factory_ =
00088     TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
00089   ACE_CHECK;
00090 
00091   // Init ec_container_
00092   TAO_Notify_EventChannel_Container* ecc = 0;
00093   ACE_NEW_THROW_EX (ecc,
00094                     TAO_Notify_EventChannel_Container (),
00095                     CORBA::INTERNAL ());
00096   ACE_CHECK;
00097   this->ec_container_.reset( ecc );
00098 
00099   this->ec_container().init (ACE_ENV_SINGLE_ARG_PARAMETER);
00100   ACE_CHECK;
00101 
00102   TAO_Notify_POA_Helper* object_poa = 0;
00103 
00104   // Bootstrap initial Object POA
00105   ACE_NEW_THROW_EX (object_poa,
00106                     TAO_Notify_POA_Helper (),
00107                     CORBA::NO_MEMORY ());
00108   ACE_CHECK;
00109 
00110   ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00111 
00112   object_poa->init (poa ACE_ENV_ARG_PARAMETER);
00113   ACE_CHECK;
00114 
00115   this->adopt_poa (auto_object_poa.release ());
00116 
00117   // Note topology factory is configured separately from the "builder" mediated
00118   // objects since it is independant of the "style" of Notification Service.
00119   this->topology_factory_ =
00120     ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00121 
00122   this->load_topology (ACE_ENV_SINGLE_ARG_PARAMETER);
00123   ACE_CHECK;
00124 
00125   this->load_event_persistence (ACE_ENV_SINGLE_ARG_PARAMETER);
00126   ACE_CHECK;
00127 }
00128 
00129 void
00130 TAO_Notify_EventChannelFactory::_add_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00131 {
00132   this->_incr_refcnt ();
00133 }
00134 
00135 void
00136 TAO_Notify_EventChannelFactory::_remove_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00137 {
00138   this->_decr_refcnt ();
00139 }
00140 
00141 void
00142 TAO_Notify_EventChannelFactory::release (void)
00143 {
00144   delete this;
00145   //@@ inform factory
00146 }
00147 
00148 void
00149 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel ACE_ENV_ARG_DECL)
00150 {
00151   this->ec_container().remove (event_channel ACE_ENV_ARG_PARAMETER);
00152   ACE_CHECK;
00153   this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00154 }
00155 
00156 int
00157 TAO_Notify_EventChannelFactory::shutdown (ACE_ENV_SINGLE_ARG_DECL)
00158 {
00159   int sd_ret = TAO_Notify_Object::shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00160   ACE_CHECK_RETURN (1);
00161 
00162   if (sd_ret == 1)
00163     return 1;
00164 
00165   this->ec_container().shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
00166   ACE_CHECK_RETURN (1);
00167 
00168   return 0;
00169 }
00170 
00171 CosNotifyFilter::FilterFactory_ptr
00172 TAO_Notify_EventChannelFactory::get_default_filter_factory (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00173 {
00174   return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00175 }
00176 
00177 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00178     const CosNotification::QoSProperties & initial_qos,
00179     const CosNotification::AdminProperties & initial_admin,
00180     CosNotifyChannelAdmin::ChannelID_out id ACE_ENV_ARG_DECL
00181   )
00182   ACE_THROW_SPEC ((
00183                    CORBA::SystemException
00184                    , CosNotification::UnsupportedQoS
00185                    , CosNotification::UnsupportedAdmin
00186                    ))
00187 {
00188   CosNotifyChannelAdmin::EventChannel_var ec =
00189     TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00190                                                                         , initial_qos
00191                                                                         , initial_admin
00192                                                                         , id
00193                                                                         ACE_ENV_ARG_PARAMETER);
00194   ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil());
00195   this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00196   ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil());
00197   return ec._retn ();
00198 }
00199 
00200 CosNotifyChannelAdmin::ChannelIDSeq*
00201 TAO_Notify_EventChannelFactory::get_all_channels (ACE_ENV_SINGLE_ARG_DECL)
00202   ACE_THROW_SPEC ((
00203     CORBA::SystemException
00204   ))
00205 {
00206   TAO_Notify_EventChannel_Seq_Worker seq_worker;
00207 
00208   return seq_worker.create (this->ec_container() ACE_ENV_ARG_PARAMETER);
00209 }
00210 
00211 CosNotifyChannelAdmin::EventChannel_ptr
00212 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id ACE_ENV_ARG_DECL)
00213   ACE_THROW_SPEC ((
00214                    CORBA::SystemException
00215                    , CosNotifyChannelAdmin::ChannelNotFound
00216                    ))
00217 {
00218   TAO_Notify_EventChannel_Find_Worker find_worker;
00219 
00220   return find_worker.resolve (id, this->ec_container() ACE_ENV_ARG_PARAMETER);
00221 }
00222 
00223 void
00224 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00225 {
00226   ACE_DEBUG ((LM_DEBUG,
00227     ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00228     ));
00229   // If the above meessage appears when you don't expect it
00230   // use svc.conf to install the topology factory rather
00231   // than calling this method.
00232   this->topology_factory_ = f;
00233 }
00234 
00235 void
00236 TAO_Notify_EventChannelFactory::load_topology (ACE_ENV_SINGLE_ARG_DECL)
00237 {
00238   this->loading_topology_ = true;
00239   if (this->topology_factory_ != 0)
00240   {
00241     // create_loader will open and load the persistence file for validation
00242     auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00243     if (tl.get () != 0)
00244     {
00245       tl->load (this ACE_ENV_ARG_PARAMETER);
00246       ACE_CHECK;
00247     }
00248   }
00249   else
00250   {
00251     if (TAO_debug_level > 0)
00252       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00253   }
00254   this->loading_topology_ = false;
00255 }
00256 bool
00257 TAO_Notify_EventChannelFactory::is_persistent () const
00258 {
00259   return true;
00260 }
00261 
00262 void
00263 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver ACE_ENV_ARG_DECL)
00264 {
00265   bool changed = this->self_changed_;
00266   this->self_changed_ = false;
00267   this->children_changed_ = false;
00268 
00269   TAO_Notify::NVPList attrs; // ECF has no attributes
00270 
00271   bool want_all_children =
00272     saver.begin_object(0, "channel_factory", attrs, changed ACE_ENV_ARG_PARAMETER);
00273   ACE_CHECK;
00274 
00275   // for each deleted child
00276   //  delete_child  // if the child has persistence.
00277 
00278   TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00279 
00280   this->ec_container().collection()->for_each(&wrk ACE_ENV_ARG_PARAMETER);
00281   ACE_CHECK;
00282 
00283   if (want_all_children || this->reconnect_registry_.is_changed ())
00284   {
00285     this->reconnect_registry_.save_persistent(saver ACE_ENV_ARG_PARAMETER);
00286     ACE_CHECK;
00287   }
00288   saver.end_object(0, "channel_factory" ACE_ENV_ARG_PARAMETER);
00289 }
00290 
00291 void
00292 TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL)
00293 {
00294   TAO_Notify::Event_Persistence_Strategy * strategy =
00295     ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00296   if (strategy != 0)
00297   {
00298     if (this->topology_factory_ != 0)
00299     {
00300       TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00301       if (factory != 0)
00302       {
00303         for (
00304           TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00305           rspm != 0;
00306           rspm = rspm->load_next ())
00307         {
00308           TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00309           if (!routing_slip.null ())
00310           {
00311             this->routing_slip_restart_set_.insert (routing_slip);
00312           }
00313           else
00314           {
00315             //@@todo: tell the rspm it's an orphan, but we can't during reload
00316             // we need collect these and come back later to remove them
00317             ACE_DEBUG ((LM_DEBUG,
00318               ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00319               ));
00320           }
00321         }
00322       }
00323     }
00324     else
00325     {
00326       ACE_ERROR ((LM_ERROR,
00327         ACE_TEXT ("(%P|%t) Notify Service: Configuration error.  Event Persistence requires Topology Persistence.\n")
00328         ));
00329       ACE_THROW (CORBA::PERSIST_STORE());
00330       ACE_CHECK;
00331     }
00332   }
00333 }
00334 
00335 bool
00336 TAO_Notify_EventChannelFactory::change_to_parent (ACE_ENV_SINGLE_ARG_DECL)
00337 {
00338   bool saving = false;
00339   if (! this->loading_topology_)
00340   {
00341     // A null pointer means that saving of topology is disabled.
00342     if (this->topology_factory_ != 0)
00343     {
00344       saving = true;
00345       // seq is used to check save-in-progress
00346       // if it changes while we're waiting for the lock
00347       // then our change may have already been saved, so
00348       // just return.  Caller will signal change again if necessary.
00349       short seq = this->topology_save_seq_;
00350       ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00351       ACE_CHECK_RETURN(false);
00352       if (seq == this->topology_save_seq_)
00353       {
00354         auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00355         if (saver.get() != 0)
00356         {
00357           this->save_persistent(*saver ACE_ENV_ARG_PARAMETER);
00358           ACE_CHECK_RETURN(false);
00359           saver->close (ACE_ENV_SINGLE_ARG_PARAMETER);
00360           ACE_CHECK_RETURN (false);
00361         }
00362         this->topology_save_seq_ += 1;
00363       }
00364     }
00365   }
00366   return saving;
00367 }
00368 
00369 TAO_Notify::Topology_Object*
00370 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00371                                         CORBA::Long id,
00372                                         const TAO_Notify::
00373                                         NVPList& attrs
00374                                         ACE_ENV_ARG_DECL)
00375 {
00376   // ignore anything but our valid children (ie channel)
00377   TAO_Notify::Topology_Object * result = this;
00378   if (type == "channel")
00379   {
00380     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00381       ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00382       , static_cast<int> (id)
00383       ));
00384 
00385     TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00386     TAO_Notify_EventChannel * ec = bld->build_event_channel(
00387         this ,
00388         id
00389         ACE_ENV_ARG_PARAMETER);
00390     ACE_CHECK_RETURN(0);
00391 
00392     ec->load_attrs (attrs);
00393 
00394     result = ec;
00395   }
00396   else if (type == TAO_Notify::REGISTRY_TYPE)
00397   {
00398     result = & this->reconnect_registry_;
00399   }
00400   return result;
00401 }
00402 
00403 void
00404 TAO_Notify_EventChannelFactory::reconnect (ACE_ENV_SINGLE_ARG_DECL)
00405 {
00406   // Reconnect all children first
00407   TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00408 
00409   this->ec_container().collection()->for_each(&wrk ACE_ENV_ARG_PARAMETER);
00410   ACE_CHECK;
00411 
00412   // Then send reconnection announcement to registered clients
00413   ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00414   this->reconnect_registry_.send_reconnect (this->channel_factory_.in () ACE_ENV_ARG_PARAMETER);
00415   ACE_CHECK;
00416 
00417   // reactivate events in-progress
00418   Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00419   TAO_Notify::Routing_Slip_Ptr * routing_slip;
00420   for (iter.first(); iter.next(routing_slip); iter.advance())
00421   {
00422     (*routing_slip)->reconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
00423     ACE_CHECK;
00424   }
00425   this->routing_slip_restart_set_.reset ();
00426 }
00427 
00428 NotifyExt::ReconnectionRegistry::ReconnectionID
00429 TAO_Notify_EventChannelFactory::register_callback (
00430     NotifyExt::ReconnectionCallback_ptr reconnection
00431     ACE_ENV_ARG_DECL)
00432   ACE_THROW_SPEC ((CORBA::SystemException))
00433 {
00434   return this->reconnect_registry_.register_callback (
00435     reconnection
00436     ACE_ENV_ARG_PARAMETER);
00437 }
00438 
00439 void
00440 TAO_Notify_EventChannelFactory::unregister_callback (
00441     NotifyExt::ReconnectionRegistry::ReconnectionID id
00442     ACE_ENV_ARG_DECL)
00443   ACE_THROW_SPEC ((CORBA::SystemException))
00444 {
00445   this->reconnect_registry_.unregister_callback (
00446     id
00447     ACE_ENV_ARG_PARAMETER);
00448 }
00449 
00450 CORBA::Boolean
00451 TAO_Notify_EventChannelFactory::is_alive (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00452   ACE_THROW_SPEC ((CORBA::SystemException))
00453 {
00454   return CORBA::Boolean (1);
00455 }
00456 
00457 void
00458 TAO_Notify_EventChannelFactory::save_topology (ACE_ENV_SINGLE_ARG_DECL)
00459   ACE_THROW_SPEC ((CORBA::SystemException))
00460 {
00461   this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
00462   ACE_CHECK;
00463 }
00464 
00465 TAO_Notify_ProxyConsumer *
00466 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL)
00467 {
00468   TAO_Notify_ProxyConsumer * result = 0;
00469   size_t path_size = id_path.size ();
00470 
00471   // EventChannelFactory only:  The first id is proably for the ECF itself
00472   // if so, silently consume it.
00473   if (position < path_size && id_path[position] == this->id())
00474   {
00475     ++position;
00476   }
00477   if (position < path_size)
00478   {
00479     TAO_Notify_EventChannel_Find_Worker find_worker;
00480 
00481     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container() ACE_ENV_ARG_PARAMETER);
00482     ACE_CHECK_RETURN (0);
00483     ++position;
00484     if (ec != 0)
00485     {
00486       result = ec->find_proxy_consumer (id_path, position
00487         ACE_ENV_ARG_PARAMETER);
00488       ACE_CHECK_RETURN(0);
00489     }
00490   }
00491   return result;
00492 }
00493 
00494 TAO_Notify_ProxySupplier *
00495 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL)
00496 {
00497   TAO_Notify_ProxySupplier * result = 0;
00498   size_t path_size = id_path.size ();
00499 
00500   // EventChannelFactory only:  The first id is proably for the ECF itself
00501   // if so, silently consume it.
00502   if (position < path_size && id_path[position] == this->id())
00503   {
00504     ++position;
00505   }
00506   if (position < path_size)
00507   {
00508     TAO_Notify_EventChannel_Find_Worker find_worker;
00509     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container() ACE_ENV_ARG_PARAMETER);
00510     ACE_CHECK_RETURN (0);
00511     ++position;
00512     if (ec != 0)
00513     {
00514       result = ec->find_proxy_supplier (id_path, position
00515         ACE_ENV_ARG_PARAMETER);
00516       ACE_CHECK_RETURN(0);
00517     }
00518   }
00519   return result;
00520 }
00521 
00522 CosNotifyChannelAdmin::EventChannelFactory_ptr
00523 TAO_Notify_EventChannelFactory::activate_self (ACE_ENV_SINGLE_ARG_DECL)
00524 {
00525   CORBA::Object_var obj = this->activate (this ACE_ENV_ARG_PARAMETER);
00526   ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00527   this->channel_factory_
00528     = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
00529   ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00530   CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
00531   ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ());
00532 
00533   ACE_TRY_NEW_ENV
00534   {
00535     if (DEBUG_LEVEL > 9)
00536     {
00537       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00538     }
00539     this->reconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
00540     ACE_TRY_CHECK;
00541   }
00542   ACE_CATCHANY
00543   {
00544     // ignore for now
00545   }
00546   ACE_ENDTRY;
00547   return this->channel_factory_._retn();
00548 }
00549 
00550 
00551 TAO_Notify_Object::ID
00552 TAO_Notify_EventChannelFactory::get_id () const
00553 {
00554   return id();
00555 }
00556 
00557 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00558 TAO_Notify_EventChannelFactory::ec_container()
00559 {
00560   ACE_ASSERT( this->ec_container_.get() != 0 );
00561   return *ec_container_;
00562 }
00563 
00564 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:11 2006 for TAO_CosNotification by doxygen 1.3.6