EventChannelFactory.cpp

Go to the documentation of this file.
00001 // $Id: EventChannelFactory.cpp 79247 2007-08-07 15:24:22Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004 
00005 ACE_RCSID(Notify, TAO_Notify_EventChannelFactory, "$Id: EventChannelFactory.cpp 79247 2007-08-07 15:24:22Z elliott_c $")
00006 
00007 #include "orbsvcs/Notify/Properties.h"
00008 #include "orbsvcs/Notify/Factory.h"
00009 #include "orbsvcs/Notify/Builder.h"
00010 #include "orbsvcs/Notify/Topology_Saver.h"
00011 #include "orbsvcs/Notify/Topology_Loader.h"
00012 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00013 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00014 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00015 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00016 #include "orbsvcs/Notify/EventChannel.h"
00017 #include "orbsvcs/Notify/Container_T.h"
00018 #include "orbsvcs/Notify/Find_Worker_T.h"
00019 #include "orbsvcs/Notify/Seq_Worker_T.h"
00020 #include "orbsvcs/Notify/POA_Helper.h"
00021 
00022 #include "ace/Dynamic_Service.h"
00023 
00024 #include "tao/debug.h"
00025 //#define DEBUG_LEVEL 9
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029 
00030 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00031 
00032 // Include this here since this is the only file that
00033 // requires Topology_Factory.
00034 namespace TAO_Notify
00035 {
00036   // virtual
00037   Topology_Factory::~Topology_Factory ()
00038   {
00039   }
00040 }
00041 
00042 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00043                              , CosNotifyChannelAdmin::EventChannel
00044                              , CosNotifyChannelAdmin::EventChannel_ptr
00045                              , CosNotifyChannelAdmin::ChannelNotFound>
00046 TAO_Notify_EventChannel_Find_Worker;
00047 
00048 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00049 
00050 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00051   : topology_save_seq_ (0)
00052   , topology_factory_(0)
00053   , reconnect_registry_(*this)
00054   , loading_topology_ (false)
00055 {
00056 }
00057 
00058 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00059 {
00060 }
00061 
00062 void
00063 TAO_Notify_EventChannelFactory::destroy (void)
00064 {
00065   int result = this->shutdown ();
00066   if ( result == 1)
00067     return;
00068 
00069   TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00070 
00071   // Reset references to CORBA objects.
00072   properties->orb (CORBA::ORB::_nil ());
00073   properties->default_poa (PortableServer::POA::_nil ());
00074 
00075   ec_container_.reset( 0 );
00076 }
00077 
00078 void
00079 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa)
00080 {
00081   ACE_ASSERT (this->ec_container_.get() == 0);
00082 
00083   this->default_filter_factory_ =
00084     TAO_Notify_PROPERTIES::instance()->builder()->build_filter_factory ();
00085 
00086   // Init ec_container_
00087   TAO_Notify_EventChannel_Container* ecc = 0;
00088   ACE_NEW_THROW_EX (ecc,
00089                     TAO_Notify_EventChannel_Container (),
00090                     CORBA::INTERNAL ());
00091   this->ec_container_.reset( ecc );
00092 
00093   this->ec_container().init ();
00094 
00095   TAO_Notify_POA_Helper* object_poa = 0;
00096 
00097   // Bootstrap initial Object POA
00098   ACE_NEW_THROW_EX (object_poa,
00099                     TAO_Notify_POA_Helper (),
00100                     CORBA::NO_MEMORY ());
00101 
00102   ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00103 
00104   ACE_CString poa_name = object_poa->get_unique_id ();
00105 #if defined (CORBA_E_MICRO)
00106   object_poa->init (poa, poa_name.c_str ());
00107 #else
00108   object_poa->init_persistent (poa, poa_name.c_str ());
00109 #endif /* CORBA_E_MICRO */
00110 
00111   this->adopt_poa (auto_object_poa.release ());
00112 
00113   // Note topology factory is configured separately from the "builder" mediated
00114   // objects since it is independant of the "style" of Notification Service.
00115   this->topology_factory_ =
00116     ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00117 
00118   this->load_topology ();
00119 
00120   this->load_event_persistence ();
00121 }
00122 
00123 void
00124 TAO_Notify_EventChannelFactory::_add_ref (void)
00125 {
00126   this->_incr_refcnt ();
00127 }
00128 
00129 void
00130 TAO_Notify_EventChannelFactory::_remove_ref (void)
00131 {
00132   this->_decr_refcnt ();
00133 }
00134 
00135 void
00136 TAO_Notify_EventChannelFactory::release (void)
00137 {
00138   delete this;
00139   //@@ inform factory
00140 }
00141 
00142 void
00143 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel)
00144 {
00145   this->ec_container().remove (event_channel);
00146   this->self_change ();
00147 }
00148 
00149 int
00150 TAO_Notify_EventChannelFactory::shutdown (void)
00151 {
00152   int sd_ret = TAO_Notify_Object::shutdown ();
00153 
00154   if (sd_ret == 1)
00155     return 1;
00156 
00157   this->ec_container().shutdown ();
00158 
00159   return 0;
00160 }
00161 
00162 CosNotifyFilter::FilterFactory_ptr
00163 TAO_Notify_EventChannelFactory::get_default_filter_factory (void)
00164 {
00165   return CosNotifyFilter::FilterFactory::_duplicate (this->default_filter_factory_.in ());
00166 }
00167 
00168 CosNotifyChannelAdmin::EventChannel_ptr
00169 TAO_Notify_EventChannelFactory::create_named_channel (
00170     const CosNotification::QoSProperties& initial_qos,
00171     const CosNotification::AdminProperties& initial_admin,
00172     CosNotifyChannelAdmin::ChannelID_out id,
00173     const char*)
00174 {
00175   return this->create_channel (initial_qos, initial_admin, id);
00176 }
00177 
00178 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00179     const CosNotification::QoSProperties & initial_qos,
00180     const CosNotification::AdminProperties & initial_admin,
00181     CosNotifyChannelAdmin::ChannelID_out id
00182   )
00183 {
00184   CosNotifyChannelAdmin::EventChannel_var ec =
00185     TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00186                                                                         , initial_qos
00187                                                                         , initial_admin
00188                                                                         , id);
00189   this->self_change ();
00190   return ec._retn ();
00191 }
00192 
00193 CosNotifyChannelAdmin::ChannelIDSeq*
00194 TAO_Notify_EventChannelFactory::get_all_channels (void)
00195 {
00196   TAO_Notify_EventChannel_Seq_Worker seq_worker;
00197 
00198   return seq_worker.create (this->ec_container());
00199 }
00200 
00201 CosNotifyChannelAdmin::EventChannel_ptr
00202 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id)
00203 {
00204   TAO_Notify_EventChannel_Find_Worker find_worker;
00205 
00206   return find_worker.resolve (id, this->ec_container());
00207 }
00208 
00209 void
00210 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00211 {
00212   ACE_DEBUG ((LM_DEBUG,
00213     ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00214     ));
00215   // If the above meessage appears when you don't expect it
00216   // use svc.conf to install the topology factory rather
00217   // than calling this method.
00218   this->topology_factory_ = f;
00219 }
00220 
00221 void
00222 TAO_Notify_EventChannelFactory::load_topology (void)
00223 {
00224   this->loading_topology_ = true;
00225   if (this->topology_factory_ != 0)
00226   {
00227     // create_loader will open and load the persistence file for validation
00228     auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00229     if (tl.get () != 0)
00230     {
00231       tl->load (this);
00232     }
00233   }
00234   else
00235   {
00236     if (TAO_debug_level > 0)
00237       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00238   }
00239   this->loading_topology_ = false;
00240 }
00241 bool
00242 TAO_Notify_EventChannelFactory::is_persistent () const
00243 {
00244   return true;
00245 }
00246 
00247 void
00248 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver)
00249 {
00250   bool changed = this->self_changed_;
00251   this->self_changed_ = false;
00252   this->children_changed_ = false;
00253 
00254   TAO_Notify::NVPList attrs; // ECF has no attributes
00255 
00256   bool want_all_children =
00257     saver.begin_object(0, "channel_factory", attrs, changed);
00258 
00259   // for each deleted child
00260   //  delete_child  // if the child has persistence.
00261 
00262   TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00263 
00264   this->ec_container().collection()->for_each(&wrk);
00265 
00266   if (want_all_children || this->reconnect_registry_.is_changed ())
00267   {
00268     this->reconnect_registry_.save_persistent(saver);
00269   }
00270   saver.end_object(0, "channel_factory");
00271 }
00272 
00273 void
00274 TAO_Notify_EventChannelFactory::load_event_persistence (void)
00275 {
00276   TAO_Notify::Event_Persistence_Strategy * strategy =
00277     ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00278   if (strategy != 0)
00279   {
00280     if (this->topology_factory_ != 0)
00281     {
00282       TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00283       if (factory != 0)
00284       {
00285         for (
00286           TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00287           rspm != 0;
00288           rspm = rspm->load_next ())
00289         {
00290           TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00291           if (!routing_slip.null ())
00292           {
00293             this->routing_slip_restart_set_.insert (routing_slip);
00294           }
00295           else
00296           {
00297             //@@todo: tell the rspm it's an orphan, but we can't during reload
00298             // we need collect these and come back later to remove them
00299             ACE_DEBUG ((LM_DEBUG,
00300               ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00301               ));
00302           }
00303         }
00304       }
00305     }
00306     else
00307     {
00308       ACE_ERROR ((LM_ERROR,
00309         ACE_TEXT ("(%P|%t) Notify Service: Configuration error.  Event Persistence requires Topology Persistence.\n")
00310         ));
00311       throw CORBA::PERSIST_STORE();
00312     }
00313   }
00314 }
00315 
00316 bool
00317 TAO_Notify_EventChannelFactory::change_to_parent (void)
00318 {
00319   bool saving = false;
00320   if (! this->loading_topology_)
00321   {
00322     // A null pointer means that saving of topology is disabled.
00323     if (this->topology_factory_ != 0)
00324     {
00325       saving = true;
00326       // seq is used to check save-in-progress
00327       // if it changes while we're waiting for the lock
00328       // then our change may have already been saved, so
00329       // just return.  Caller will signal change again if necessary.
00330       short seq = this->topology_save_seq_;
00331       ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00332       if (seq == this->topology_save_seq_)
00333       {
00334         auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00335         if (saver.get() != 0)
00336         {
00337           this->save_persistent(*saver);
00338           saver->close ();
00339         }
00340         this->topology_save_seq_ += 1;
00341       }
00342     }
00343   }
00344   return saving;
00345 }
00346 
00347 TAO_Notify::Topology_Object*
00348 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00349                                         CORBA::Long id,
00350                                         const TAO_Notify::
00351                                         NVPList& attrs)
00352 {
00353   // ignore anything but our valid children (ie channel)
00354   TAO_Notify::Topology_Object * result = this;
00355   if (type == "channel")
00356   {
00357     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00358       ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00359       , static_cast<int> (id)
00360       ));
00361 
00362     TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00363     TAO_Notify_EventChannel * ec = bld->build_event_channel(
00364         this ,
00365         id);
00366 
00367     ec->load_attrs (attrs);
00368 
00369     result = ec;
00370   }
00371   else if (type == TAO_Notify::REGISTRY_TYPE)
00372   {
00373     result = & this->reconnect_registry_;
00374   }
00375   return result;
00376 }
00377 
00378 void
00379 TAO_Notify_EventChannelFactory::reconnect (void)
00380 {
00381   // Reconnect all children first
00382   TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00383 
00384   this->ec_container().collection()->for_each(&wrk);
00385 
00386   // Then send reconnection announcement to registered clients
00387   ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00388   this->reconnect_registry_.send_reconnect (this->channel_factory_.in ());
00389 
00390   // reactivate events in-progress
00391   Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00392   TAO_Notify::Routing_Slip_Ptr * routing_slip;
00393   for (iter.first(); iter.next(routing_slip); iter.advance())
00394   {
00395     (*routing_slip)->reconnect();
00396   }
00397   this->routing_slip_restart_set_.reset ();
00398 }
00399 
00400 NotifyExt::ReconnectionRegistry::ReconnectionID
00401 TAO_Notify_EventChannelFactory::register_callback (
00402     NotifyExt::ReconnectionCallback_ptr reconnection)
00403 {
00404   return this->reconnect_registry_.register_callback (
00405     reconnection);
00406 }
00407 
00408 void
00409 TAO_Notify_EventChannelFactory::unregister_callback (
00410     NotifyExt::ReconnectionRegistry::ReconnectionID id)
00411 {
00412   this->reconnect_registry_.unregister_callback (
00413     id);
00414 }
00415 
00416 CORBA::Boolean
00417 TAO_Notify_EventChannelFactory::is_alive (void)
00418 {
00419   return CORBA::Boolean (1);
00420 }
00421 
00422 void
00423 TAO_Notify_EventChannelFactory::save_topology (void)
00424 {
00425   this->self_change ();
00426 }
00427 
00428 TAO_Notify_ProxyConsumer *
00429 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00430 {
00431   TAO_Notify_ProxyConsumer * result = 0;
00432   size_t path_size = id_path.size ();
00433 
00434   // EventChannelFactory only:  The first id is proably for the ECF itself
00435   // if so, silently consume it.
00436   if (position < path_size && id_path[position] == this->id())
00437   {
00438     ++position;
00439   }
00440   if (position < path_size)
00441   {
00442     TAO_Notify_EventChannel_Find_Worker find_worker;
00443 
00444     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00445     ++position;
00446     if (ec != 0)
00447     {
00448       result = ec->find_proxy_consumer (id_path, position);
00449     }
00450   }
00451   return result;
00452 }
00453 
00454 TAO_Notify_ProxySupplier *
00455 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00456 {
00457   TAO_Notify_ProxySupplier * result = 0;
00458   size_t path_size = id_path.size ();
00459 
00460   // EventChannelFactory only:  The first id is proably for the ECF itself
00461   // if so, silently consume it.
00462   if (position < path_size && id_path[position] == this->id())
00463   {
00464     ++position;
00465   }
00466   if (position < path_size)
00467   {
00468     TAO_Notify_EventChannel_Find_Worker find_worker;
00469     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00470     ++position;
00471     if (ec != 0)
00472     {
00473       result = ec->find_proxy_supplier (id_path, position);
00474     }
00475   }
00476   return result;
00477 }
00478 
00479 CosNotifyChannelAdmin::EventChannelFactory_ptr
00480 TAO_Notify_EventChannelFactory::activate_self (void)
00481 {
00482   CORBA::Object_var obj = this->activate (this);
00483   this->channel_factory_
00484     = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in());
00485 
00486   try
00487   {
00488     if (DEBUG_LEVEL > 9)
00489     {
00490       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00491     }
00492     this->reconnect ();
00493   }
00494   catch (const CORBA::Exception&)
00495   {
00496     // ignore for now
00497   }
00498   return this->channel_factory_._retn();
00499 }
00500 
00501 
00502 TAO_Notify_Object::ID
00503 TAO_Notify_EventChannelFactory::get_id () const
00504 {
00505   return id();
00506 }
00507 
00508 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00509 TAO_Notify_EventChannelFactory::ec_container()
00510 {
00511   ACE_ASSERT( this->ec_container_.get() != 0 );
00512   return *ec_container_;
00513 }
00514 
00515 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 15:39:55 2008 for TAO_CosNotification by doxygen 1.3.6