EventChannelFactory.cpp

Go to the documentation of this file.
00001 // $Id: EventChannelFactory.cpp 80974 2008-03-17 13:40:44Z johnnyw $
00002 
00003 #include "orbsvcs/Notify/EventChannelFactory.h"
00004 
00005 ACE_RCSID(Notify,
00006           TAO_Notify_EventChannelFactory,
00007           "$Id: EventChannelFactory.cpp 80974 2008-03-17 13:40:44Z johnnyw $")
00008 
00009 #include "orbsvcs/Notify/Properties.h"
00010 #include "orbsvcs/Notify/Factory.h"
00011 #include "orbsvcs/Notify/Builder.h"
00012 #include "orbsvcs/Notify/Topology_Saver.h"
00013 #include "orbsvcs/Notify/Topology_Loader.h"
00014 #include "orbsvcs/Notify/Save_Persist_Worker_T.h"
00015 #include "orbsvcs/Notify/Reconnect_Worker_T.h"
00016 #include "orbsvcs/Notify/Event_Persistence_Strategy.h"
00017 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00018 #include "orbsvcs/Notify/EventChannel.h"
00019 #include "orbsvcs/Notify/Container_T.h"
00020 #include "orbsvcs/Notify/Find_Worker_T.h"
00021 #include "orbsvcs/Notify/Seq_Worker_T.h"
00022 #include "orbsvcs/Notify/POA_Helper.h"
00023 
00024 #include "ace/Dynamic_Service.h"
00025 
00026 #include "tao/debug.h"
00027 //#define DEBUG_LEVEL 9
00028 #ifndef DEBUG_LEVEL
00029 # define DEBUG_LEVEL TAO_debug_level
00030 #endif //DEBUG_LEVEL
00031 
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 // Include this here since this is the only file that
00035 // requires Topology_Factory.
00036 namespace TAO_Notify
00037 {
00038   // virtual
00039   Topology_Factory::~Topology_Factory ()
00040   {
00041   }
00042 }
00043 
00044 typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel
00045                              , CosNotifyChannelAdmin::EventChannel
00046                              , CosNotifyChannelAdmin::EventChannel_ptr
00047                              , CosNotifyChannelAdmin::ChannelNotFound>
00048 TAO_Notify_EventChannel_Find_Worker;
00049 
00050 typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker;
00051 
00052 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void)
00053   : topology_save_seq_ (0)
00054   , topology_factory_(0)
00055   , reconnect_registry_(*this)
00056   , loading_topology_ (false)
00057 {
00058 }
00059 
00060 TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory ()
00061 {
00062 }
00063 
00064 void
00065 TAO_Notify_EventChannelFactory::destroy (void)
00066 {
00067   if (this->shutdown () == 1)
00068     return;
00069 
00070   TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00071 
00072   // Reset references to CORBA objects.
00073   properties->orb (CORBA::ORB::_nil ());
00074   properties->default_poa (PortableServer::POA::_nil ());
00075 
00076   ec_container_.reset( 0 );
00077 }
00078 
00079 void
00080 TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa)
00081 {
00082   ACE_ASSERT (this->ec_container_.get() == 0);
00083 
00084   // Init ec_container_
00085   TAO_Notify_EventChannel_Container* ecc = 0;
00086   ACE_NEW_THROW_EX (ecc,
00087                     TAO_Notify_EventChannel_Container (),
00088                     CORBA::INTERNAL ());
00089   this->ec_container_.reset( ecc );
00090 
00091   this->ec_container().init ();
00092 
00093   TAO_Notify_POA_Helper* object_poa = 0;
00094 
00095   // Bootstrap initial Object POA
00096   ACE_NEW_THROW_EX (object_poa,
00097                     TAO_Notify_POA_Helper (),
00098                     CORBA::NO_MEMORY ());
00099 
00100   ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa);
00101 
00102   ACE_CString poa_name = object_poa->get_unique_id ();
00103 #if defined (CORBA_E_MICRO)
00104   object_poa->init (poa, poa_name.c_str ());
00105 #else
00106   object_poa->init_persistent (poa, poa_name.c_str ());
00107 #endif /* CORBA_E_MICRO */
00108 
00109   this->adopt_poa (auto_object_poa.release ());
00110 
00111   // Note topology factory is configured separately from the "builder" mediated
00112   // objects since it is independant of the "style" of Notification Service.
00113   this->topology_factory_ =
00114     ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory");
00115 
00116   this->load_topology ();
00117 
00118   this->load_event_persistence ();
00119 }
00120 
00121 void
00122 TAO_Notify_EventChannelFactory::_add_ref (void)
00123 {
00124   this->_incr_refcnt ();
00125 }
00126 
00127 void
00128 TAO_Notify_EventChannelFactory::_remove_ref (void)
00129 {
00130   this->_decr_refcnt ();
00131 }
00132 
00133 void
00134 TAO_Notify_EventChannelFactory::release (void)
00135 {
00136   delete this;
00137   //@@ inform factory
00138 }
00139 
00140 void
00141 TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel)
00142 {
00143   this->ec_container().remove (event_channel);
00144   this->self_change ();
00145 }
00146 
00147 int
00148 TAO_Notify_EventChannelFactory::shutdown (void)
00149 {
00150   if (TAO_Notify_Object::shutdown () == 1)
00151     return 1;
00152 
00153   this->ec_container().shutdown ();
00154 
00155   return 0;
00156 }
00157 
00158 CosNotifyChannelAdmin::EventChannel_ptr
00159 TAO_Notify_EventChannelFactory::create_named_channel (
00160     const CosNotification::QoSProperties& initial_qos,
00161     const CosNotification::AdminProperties& initial_admin,
00162     CosNotifyChannelAdmin::ChannelID_out id,
00163     const char*)
00164 {
00165   return this->create_channel (initial_qos, initial_admin, id);
00166 }
00167 
00168 ::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel (
00169     const CosNotification::QoSProperties & initial_qos,
00170     const CosNotification::AdminProperties & initial_admin,
00171     CosNotifyChannelAdmin::ChannelID_out id
00172   )
00173 {
00174   CosNotifyChannelAdmin::EventChannel_var ec =
00175     TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this
00176                                                                         , initial_qos
00177                                                                         , initial_admin
00178                                                                         , id);
00179   this->self_change ();
00180   return ec._retn ();
00181 }
00182 
00183 CosNotifyChannelAdmin::ChannelIDSeq*
00184 TAO_Notify_EventChannelFactory::get_all_channels (void)
00185 {
00186   TAO_Notify_EventChannel_Seq_Worker seq_worker;
00187 
00188   return seq_worker.create (this->ec_container());
00189 }
00190 
00191 CosNotifyChannelAdmin::EventChannel_ptr
00192 TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id)
00193 {
00194   TAO_Notify_EventChannel_Find_Worker find_worker;
00195 
00196   return find_worker.resolve (id, this->ec_container());
00197 }
00198 
00199 void
00200 TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f)
00201 {
00202   ACE_DEBUG ((LM_DEBUG,
00203     ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n")
00204     ));
00205   // If the above meessage appears when you don't expect it
00206   // use svc.conf to install the topology factory rather
00207   // than calling this method.
00208   this->topology_factory_ = f;
00209 }
00210 
00211 void
00212 TAO_Notify_EventChannelFactory::load_topology (void)
00213 {
00214   this->loading_topology_ = true;
00215   if (this->topology_factory_ != 0)
00216   {
00217     // create_loader will open and load the persistence file for validation
00218     auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader());
00219     if (tl.get () != 0)
00220     {
00221       tl->load (this);
00222     }
00223   }
00224   else
00225   {
00226     if (TAO_debug_level > 0)
00227       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n")));
00228   }
00229   this->loading_topology_ = false;
00230 }
00231 bool
00232 TAO_Notify_EventChannelFactory::is_persistent () const
00233 {
00234   return true;
00235 }
00236 
00237 void
00238 TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver)
00239 {
00240   bool changed = this->self_changed_;
00241   this->self_changed_ = false;
00242   this->children_changed_ = false;
00243 
00244   TAO_Notify::NVPList attrs; // ECF has no attributes
00245 
00246   bool want_all_children =
00247     saver.begin_object(0, "channel_factory", attrs, changed);
00248 
00249   // for each deleted child
00250   //  delete_child  // if the child has persistence.
00251 
00252   TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children);
00253 
00254   this->ec_container().collection()->for_each(&wrk);
00255 
00256   if (want_all_children || this->reconnect_registry_.is_changed ())
00257   {
00258     this->reconnect_registry_.save_persistent(saver);
00259   }
00260   saver.end_object(0, "channel_factory");
00261 }
00262 
00263 void
00264 TAO_Notify_EventChannelFactory::load_event_persistence (void)
00265 {
00266   TAO_Notify::Event_Persistence_Strategy * strategy =
00267     ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00268   if (strategy != 0)
00269   {
00270     if (this->topology_factory_ != 0)
00271     {
00272       TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory ();
00273       if (factory != 0)
00274       {
00275         for (
00276           TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager();
00277           rspm != 0;
00278           rspm = rspm->load_next ())
00279         {
00280           TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm);
00281           if (!routing_slip.null ())
00282           {
00283             this->routing_slip_restart_set_.insert (routing_slip);
00284           }
00285           else
00286           {
00287             //@@todo: tell the rspm it's an orphan, but we can't during reload
00288             // we need collect these and come back later to remove them
00289             ACE_DEBUG ((LM_DEBUG,
00290               ACE_TEXT ("(%P|%t) Reload persistent event failed.\n")
00291               ));
00292           }
00293         }
00294       }
00295     }
00296     else
00297     {
00298       ACE_ERROR ((LM_ERROR,
00299         ACE_TEXT ("(%P|%t) Notify Service: Configuration error.  Event Persistence requires Topology Persistence.\n")
00300         ));
00301       throw CORBA::PERSIST_STORE();
00302     }
00303   }
00304 }
00305 
00306 bool
00307 TAO_Notify_EventChannelFactory::change_to_parent (void)
00308 {
00309   bool saving = false;
00310   if (! this->loading_topology_)
00311   {
00312     // A null pointer means that saving of topology is disabled.
00313     if (this->topology_factory_ != 0)
00314     {
00315       saving = true;
00316       // seq is used to check save-in-progress
00317       // if it changes while we're waiting for the lock
00318       // then our change may have already been saved, so
00319       // just return.  Caller will signal change again if necessary.
00320       short seq = this->topology_save_seq_;
00321       ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ());
00322       if (seq == this->topology_save_seq_)
00323       {
00324         auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver());
00325         if (saver.get() != 0)
00326         {
00327           this->save_persistent(*saver);
00328           saver->close ();
00329         }
00330         this->topology_save_seq_ += 1;
00331       }
00332     }
00333   }
00334   return saving;
00335 }
00336 
00337 TAO_Notify::Topology_Object*
00338 TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type,
00339                                         CORBA::Long id,
00340                                         const TAO_Notify::
00341                                         NVPList& attrs)
00342 {
00343   // ignore anything but our valid children (ie channel)
00344   TAO_Notify::Topology_Object * result = this;
00345   if (type == "channel")
00346   {
00347     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00348       ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n")
00349       , static_cast<int> (id)
00350       ));
00351 
00352     TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder();
00353     TAO_Notify_EventChannel * ec = bld->build_event_channel(
00354         this ,
00355         id);
00356 
00357     ec->load_attrs (attrs);
00358 
00359     result = ec;
00360   }
00361   else if (type == TAO_Notify::REGISTRY_TYPE)
00362   {
00363     result = & this->reconnect_registry_;
00364   }
00365   return result;
00366 }
00367 
00368 void
00369 TAO_Notify_EventChannelFactory::reconnect (void)
00370 {
00371   // Reconnect all children first
00372   TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk;
00373 
00374   this->ec_container().collection()->for_each(&wrk);
00375 
00376   // Then send reconnection announcement to registered clients
00377   ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ()));
00378   this->reconnect_registry_.send_reconnect (this->channel_factory_.in ());
00379 
00380   // reactivate events in-progress
00381   Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_);
00382   TAO_Notify::Routing_Slip_Ptr * routing_slip;
00383   for (iter.first(); iter.next(routing_slip); iter.advance())
00384   {
00385     (*routing_slip)->reconnect();
00386   }
00387   this->routing_slip_restart_set_.reset ();
00388 }
00389 
00390 NotifyExt::ReconnectionRegistry::ReconnectionID
00391 TAO_Notify_EventChannelFactory::register_callback (
00392     NotifyExt::ReconnectionCallback_ptr reconnection)
00393 {
00394   return this->reconnect_registry_.register_callback (
00395     reconnection);
00396 }
00397 
00398 void
00399 TAO_Notify_EventChannelFactory::unregister_callback (
00400     NotifyExt::ReconnectionRegistry::ReconnectionID id)
00401 {
00402   this->reconnect_registry_.unregister_callback (
00403     id);
00404 }
00405 
00406 CORBA::Boolean
00407 TAO_Notify_EventChannelFactory::is_alive (void)
00408 {
00409   return CORBA::Boolean (1);
00410 }
00411 
00412 void
00413 TAO_Notify_EventChannelFactory::save_topology (void)
00414 {
00415   this->self_change ();
00416 }
00417 
00418 TAO_Notify_ProxyConsumer *
00419 TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position)
00420 {
00421   TAO_Notify_ProxyConsumer * result = 0;
00422   size_t path_size = id_path.size ();
00423 
00424   // EventChannelFactory only:  The first id is proably for the ECF itself
00425   // if so, silently consume it.
00426   if (position < path_size && id_path[position] == this->id())
00427   {
00428     ++position;
00429   }
00430   if (position < path_size)
00431   {
00432     TAO_Notify_EventChannel_Find_Worker find_worker;
00433 
00434     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00435     ++position;
00436     if (ec != 0)
00437     {
00438       result = ec->find_proxy_consumer (id_path, position);
00439     }
00440   }
00441   return result;
00442 }
00443 
00444 TAO_Notify_ProxySupplier *
00445 TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position)
00446 {
00447   TAO_Notify_ProxySupplier * result = 0;
00448   size_t path_size = id_path.size ();
00449 
00450   // EventChannelFactory only:  The first id is proably for the ECF itself
00451   // if so, silently consume it.
00452   if (position < path_size && id_path[position] == this->id())
00453   {
00454     ++position;
00455   }
00456   if (position < path_size)
00457   {
00458     TAO_Notify_EventChannel_Find_Worker find_worker;
00459     TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container());
00460     ++position;
00461     if (ec != 0)
00462     {
00463       result = ec->find_proxy_supplier (id_path, position);
00464     }
00465   }
00466   return result;
00467 }
00468 
00469 CosNotifyChannelAdmin::EventChannelFactory_ptr
00470 TAO_Notify_EventChannelFactory::activate_self (void)
00471 {
00472   CORBA::Object_var obj = this->activate (this);
00473   this->channel_factory_
00474     = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in());
00475 
00476   try
00477   {
00478     if (DEBUG_LEVEL > 9)
00479     {
00480       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") ));
00481     }
00482     this->reconnect ();
00483   }
00484   catch (const CORBA::Exception&)
00485   {
00486     // ignore for now
00487   }
00488   return this->channel_factory_._retn();
00489 }
00490 
00491 
00492 TAO_Notify_Object::ID
00493 TAO_Notify_EventChannelFactory::get_id () const
00494 {
00495   return id();
00496 }
00497 
00498 TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container&
00499 TAO_Notify_EventChannelFactory::ec_container()
00500 {
00501   ACE_ASSERT( this->ec_container_.get() != 0 );
00502   return *ec_container_;
00503 }
00504 
00505 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7