CosNotify_Service.cpp

Go to the documentation of this file.
00001 // $Id: CosNotify_Service.cpp 79227 2007-08-06 13:52:47Z elliott_c $
00002 
00003 #include "orbsvcs/Notify/CosNotify_Service.h"
00004 #include "orbsvcs/Notify/Properties.h"
00005 #include "orbsvcs/Notify/Default_Factory.h"
00006 #include "orbsvcs/Notify/Builder.h"
00007 #include "orbsvcs/Notify/EventChannel.h"
00008 #include "ace/Sched_Params.h"
00009 #include "ace/Arg_Shifter.h"
00010 #include "ace/Dynamic_Service.h"
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/NotifyExtC.h"
00013 #include "tao/debug.h"
00014 
00015 ACE_RCSID (Notify,
00016            TAO_CosNotify_Service,
00017            "$Id: CosNotify_Service.cpp 79227 2007-08-06 13:52:47Z elliott_c $")
00018 
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 TAO_CosNotify_Service::TAO_CosNotify_Service (void)
00022 {
00023 }
00024 
00025 TAO_CosNotify_Service::~TAO_CosNotify_Service (void)
00026 {
00027 }
00028 
00029 int
00030 TAO_CosNotify_Service::init (int argc, ACE_TCHAR *argv[])
00031 {
00032   ACE_Arg_Shifter arg_shifter (argc, argv);
00033 
00034   const ACE_TCHAR *current_arg = 0;
00035 
00036   // Default to an all reactive system.
00037   int ec_threads = 0;
00038   int consumer_threads = 0;
00039   int supplier_threads = 0;
00040 
00041   int task_per_proxy = 0;
00042 
00043   TAO_Notify_Properties *properties = TAO_Notify_PROPERTIES::instance();
00044 
00045   while (arg_shifter.is_anything_left ())
00046     {
00047       if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTDispatching")) == 0)
00048         {
00049           arg_shifter.consume_arg ();
00050           ACE_DEBUG ((LM_DEBUG,
00051                       ACE_TEXT ("(%P|%t) The -MTDispatching option has been deprecated, use -DispatchingThreads \n")));
00052         }
00053       else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-DispatchingThreads"))))
00054         {
00055           consumer_threads += ACE_OS::atoi (current_arg);
00056           arg_shifter.consume_arg ();
00057         }
00058       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTSourceEval")) == 0)
00059         {
00060           arg_shifter.consume_arg ();
00061           ACE_DEBUG ((LM_DEBUG,
00062                       ACE_TEXT ("(%P|%t) The -MTSourceEval option has been deprecated, use -SourceThreads \n")));
00063         }
00064       else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-SourceThreads"))))
00065         {
00066           supplier_threads += ACE_OS::atoi (current_arg);
00067           arg_shifter.consume_arg ();
00068         }
00069       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTLookup")) == 0)
00070         {
00071           arg_shifter.consume_arg ();
00072           ACE_DEBUG ((LM_DEBUG,
00073                       ACE_TEXT ("(%P|%t) The -MTLookup option has been deprecated, use -SourceThreads \n")));
00074         }
00075       else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LookupThreads"))))
00076         {
00077           supplier_threads += ACE_OS::atoi (current_arg);
00078           arg_shifter.consume_arg ();
00079           ACE_DEBUG ((LM_DEBUG,
00080                       ACE_TEXT ("(%P|%t) The -LookupThreads option has been deprecated, use -SourceThreads \n")));
00081         }
00082       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTListenerEval")) == 0)
00083         {
00084           arg_shifter.consume_arg ();
00085           ACE_DEBUG ((LM_DEBUG,
00086                       ACE_TEXT ("(%P|%t) The -MTListenerEval option has been deprecated, use -DispatchingThreads \n")));
00087         }
00088       else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-ListenerThreads"))))
00089         {
00090           // Since this option is always added to consumer_threads, we'll
00091           // deprecate it in favor of that option.
00092           ACE_DEBUG ((LM_DEBUG,
00093                       ACE_TEXT ("(%P|%t) The -ListenerThreads option has been deprecated, use -DispatchingThreads \n")));
00094           consumer_threads += ACE_OS::atoi (current_arg);
00095           arg_shifter.consume_arg ();
00096         }
00097       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AsynchUpdates")) == 0)
00098         {
00099           arg_shifter.consume_arg ();
00100 
00101           properties->asynch_updates (1);
00102         }
00103       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-NoUpdates")) == 0)
00104         {
00105           arg_shifter.consume_arg ();
00106 
00107           properties->updates (0);
00108         }
00109       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllocateTaskperProxy")) == 0)
00110         {
00111           task_per_proxy = 1;
00112           arg_shifter.consume_arg ();
00113         }
00114       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0)
00115         {
00116           current_arg = arg_shifter.get_the_parameter
00117                                 (ACE_TEXT("-UseSeparateDispatchingORB"));
00118           if (current_arg != 0 &&
00119               (ACE_OS::strcmp(ACE_TEXT ("0"), current_arg) == 0 ||
00120                ACE_OS::strcmp(ACE_TEXT ("1"), current_arg) == 0))
00121             {
00122               properties->separate_dispatching_orb (
00123                             static_cast<bool> (ACE_OS::atoi(current_arg)));
00124               ACE_DEBUG ((LM_DEBUG,
00125                           ACE_TEXT ("Using separate Dispatching ORB\n")));
00126             }
00127           else
00128             {
00129               ACE_DEBUG ((LM_DEBUG,
00130                           ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00131                           ACE_TEXT ("argument (%s).  Ignoring invalid ")
00132                           ACE_TEXT ("-UseSeparateDispatchingORB usage.\n"),
00133                           (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00134             }
00135           if (current_arg != 0)
00136             arg_shifter.consume_arg ();
00137         }
00138       else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllowReconnect")) == 0)
00139       {
00140         arg_shifter.consume_arg ();
00141         TAO_Notify_PROPERTIES::instance()->allow_reconnect (true);
00142       }
00143       else
00144       {
00145         ACE_ERROR ((LM_ERROR,
00146                     ACE_TEXT ("(%P|%t) Ignoring unknown option for Notify Factory: %s\n"),
00147                     arg_shifter.get_current()
00148           ));
00149         arg_shifter.consume_arg ();
00150       }
00151     }
00152 
00153   // Init the EC QoS
00154   {
00155     CosNotification::QoSProperties qos;
00156     this->set_threads (qos, ec_threads);
00157     properties->default_event_channel_qos_properties (qos);
00158   }
00159 
00160   if (task_per_proxy == 0)
00161     {
00162       // Set the per ConsumerAdmin QoS
00163       {
00164         if (consumer_threads > 0)
00165           ACE_DEBUG((LM_DEBUG, "Using %d threads for each ConsumerAdmin.\n", consumer_threads));
00166         CosNotification::QoSProperties qos;
00167         this->set_threads (qos, consumer_threads);
00168         properties->default_consumer_admin_qos_properties (qos);
00169       }
00170 
00171       // Set the per SupplierAdmin QoS
00172       {
00173         if (supplier_threads > 0)
00174           ACE_DEBUG((LM_DEBUG, "Using %d threads for each SupplierAdmin.\n", supplier_threads));
00175         CosNotification::QoSProperties qos;
00176         this->set_threads (qos, supplier_threads);
00177         properties->default_supplier_admin_qos_properties (qos);
00178       }
00179     }
00180   else
00181     {
00182       // Set the per ProxyConsumer QoS
00183       {
00184         if (supplier_threads > 0)
00185           ACE_DEBUG((LM_DEBUG, "Using %d threads for each Supplier.\n", supplier_threads));
00186         CosNotification::QoSProperties qos;
00187         this->set_threads (qos, supplier_threads); // lookup thread per proxy doesn't make sense.
00188         properties->default_proxy_consumer_qos_properties (qos);
00189       }
00190 
00191       // Set the per ProxySupplier QoS
00192       {
00193         if (consumer_threads > 0)
00194           ACE_DEBUG((LM_DEBUG, "Using %d threads for each Consumer.\n", consumer_threads));
00195         CosNotification::QoSProperties qos;
00196         this->set_threads (qos, consumer_threads);
00197         properties->default_proxy_supplier_qos_properties (qos);
00198       }
00199     }
00200 
00201   return 0;
00202 }
00203 
00204 void
00205 TAO_CosNotify_Service::set_threads (CosNotification::QoSProperties &qos, int threads)
00206 {
00207   NotifyExt::ThreadPoolParams tp_params =
00208     {NotifyExt::CLIENT_PROPAGATED, 0, 0, (unsigned)threads, 0, 0, 0, 0, 0 };
00209 
00210   qos.length (1);
00211   qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
00212   qos[0].value <<= tp_params;
00213 }
00214 
00215 int
00216 TAO_CosNotify_Service::fini (void)
00217 {
00218   return 0;
00219 }
00220 
00221 void
00222 TAO_CosNotify_Service::init_service (CORBA::ORB_ptr orb)
00223 {
00224   ACE_DEBUG ((LM_DEBUG, "Loading the Cos Notification Service...\n"));
00225 
00226   if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb())
00227     {
00228       // got here by way of svc.conf. no second orb supplied so create one
00229       if (0 == TAO_Notify_PROPERTIES::instance()->dispatching_orb())
00230         {
00231           ACE_DEBUG ((LM_DEBUG, "No dispatching orb supplied. Creating default one.\n"));
00232 
00233           int argc = 0;
00234           char *argv0 = 0;
00235           char **argv = &argv0;  // ansi requires argv be null terminated.
00236           CORBA::ORB_var dispatcher = CORBA::ORB_init (argc, argv,
00237                                                        "default_dispatcher");
00238 
00239           TAO_Notify_PROPERTIES::instance()->dispatching_orb(dispatcher.in());
00240         }
00241 
00242       this->init_i2 (orb, TAO_Notify_PROPERTIES::instance()->dispatching_orb());
00243 
00244     }
00245   else
00246     {
00247       this->init_i (orb);
00248     }
00249 }
00250 
00251 void
00252 TAO_CosNotify_Service::init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00253 {
00254   this->init_i2 (orb, dispatching_orb);
00255 }
00256 
00257 void
00258 TAO_CosNotify_Service::finalize_service (
00259                    CosNotifyChannelAdmin::EventChannelFactory_ptr factory)
00260 {
00261   // Get out early if we can
00262   if (CORBA::is_nil (factory))
00263     return;
00264 
00265   // Make sure the factory doesn't go away while we're in here
00266   CosNotifyChannelAdmin::EventChannelFactory_var ecf =
00267     CosNotifyChannelAdmin::EventChannelFactory::_duplicate (factory);
00268 
00269   // Find all the consumer admin objects and shutdown the worker tasks
00270   CosNotifyChannelAdmin::ChannelIDSeq_var channels =
00271     ecf->get_all_channels ();
00272   CORBA::ULong length = channels->length ();
00273   for(CORBA::ULong i = 0; i < length; i++)
00274     {
00275       try
00276         {
00277           CosNotifyChannelAdmin::EventChannel_var ec =
00278             ecf->get_event_channel (channels[i]);
00279           if (!CORBA::is_nil (ec.in ()))
00280             {
00281               TAO_Notify_EventChannel* nec =
00282                 dynamic_cast<TAO_Notify_EventChannel*> (ec->_servant ());
00283               if (nec != 0)
00284                 nec->destroy ();
00285             }
00286         }
00287       catch (const CORBA::Exception&)
00288         {
00289           // We're shutting things down, so ignore exceptions
00290         }
00291     }
00292 }
00293 
00294 void
00295 TAO_CosNotify_Service::init_i (CORBA::ORB_ptr orb)
00296 {
00297   // Obtain the Root POA
00298   CORBA::Object_var object  =
00299     orb->resolve_initial_references("RootPOA");
00300 
00301   if (CORBA::is_nil (object.in ()))
00302     ACE_ERROR ((LM_ERROR,
00303                 " (%P|%t) Unable to resolve the RootPOA.\n"));
00304 
00305   PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00306 
00307   // Set the properties
00308   TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00309 
00310   properties->orb (orb);
00311   properties->default_poa (default_poa.in ());
00312 
00313   // Init the factory
00314   this->factory_.reset (this->create_factory ());
00315   ACE_ASSERT( this->factory_.get() != 0 );
00316   TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00317 
00318   this->builder_.reset (this->create_builder ());
00319   ACE_ASSERT( this->builder_.get() != 0 );
00320   TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00321 }
00322 
00323 void
00324 TAO_CosNotify_Service::init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00325 {
00326   // Obtain the Root POA
00327   CORBA::Object_var object  =
00328     orb->resolve_initial_references("RootPOA");
00329 
00330   if (CORBA::is_nil (object.in ()))
00331     ACE_ERROR ((LM_ERROR, " (%P|%t) Unable to resolve the RootPOA.\n"));
00332 
00333   PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00334 
00335   // Set the properties
00336   TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00337 
00338   properties->orb (orb);
00339   properties->dispatching_orb (dispatching_orb);
00340   properties->separate_dispatching_orb (true);
00341 
00342   properties->default_poa (default_poa.in ());
00343 
00344   // Init the factory and builder
00345   this->factory_.reset (this->create_factory ());
00346   ACE_ASSERT( this->factory_.get() != 0 );
00347   TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00348 
00349   this->builder_.reset (this->create_builder ());
00350   ACE_ASSERT( this->builder_.get() != 0 );
00351   TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00352 }
00353 
00354 TAO_Notify_Factory*
00355 TAO_CosNotify_Service::create_factory (void)
00356 {
00357   TAO_Notify_Factory* factory = ACE_Dynamic_Service<TAO_Notify_Factory>::instance ("TAO_Notify_Factory");
00358   if (factory == 0)
00359     {
00360        ACE_NEW_THROW_EX (factory,
00361                          TAO_Notify_Default_Factory (),
00362                          CORBA::NO_MEMORY ());
00363     }
00364   return factory;
00365 }
00366 
00367 TAO_Notify_Builder*
00368 TAO_CosNotify_Service::create_builder (void)
00369 {
00370   TAO_Notify_Builder* builder = 0;
00371   ACE_NEW_THROW_EX (builder,
00372                     TAO_Notify_Builder (),
00373                     CORBA::NO_MEMORY ());
00374 
00375   return builder;
00376 }
00377 
00378 CosNotifyChannelAdmin::EventChannelFactory_ptr
00379 TAO_CosNotify_Service::create (PortableServer::POA_ptr poa,
00380                                const char* factory_name)
00381 {
00382   return this->builder().build_event_channel_factory (poa, factory_name);
00383 }
00384 
00385 void
00386 TAO_CosNotify_Service::remove (TAO_Notify_EventChannelFactory* /*ecf*/)
00387 {
00388   // NOP.
00389 }
00390 
00391 TAO_Notify_Factory&
00392 TAO_CosNotify_Service::factory (void)
00393 {
00394   ACE_ASSERT( this->factory_.get() != 0 );
00395   return *this->factory_;
00396 }
00397 
00398 TAO_Notify_Builder&
00399 TAO_CosNotify_Service::builder (void)
00400 {
00401   ACE_ASSERT( this->builder_.get() != 0 );
00402   return *this->builder_;
00403 }
00404 
00405 TAO_END_VERSIONED_NAMESPACE_DECL
00406 
00407 
00408 /*********************************************************************************************************************/
00409 
00410 ACE_STATIC_SVC_DEFINE (TAO_Notify_Default_EMO_Factory_OLD,
00411                        ACE_TEXT (TAO_NOTIFY_DEF_EMO_FACTORY_NAME),
00412                        ACE_SVC_OBJ_T,
00413                        &ACE_SVC_NAME (TAO_CosNotify_Service),
00414                        ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00415                        0)
00416 
00417 /*********************************************************************************************************************/
00418 
00419 ACE_STATIC_SVC_DEFINE (TAO_CosNotify_Service,
00420                        ACE_TEXT (TAO_COS_NOTIFICATION_SERVICE_NAME),
00421                        ACE_SVC_OBJ_T,
00422                        &ACE_SVC_NAME (TAO_CosNotify_Service),
00423                        ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00424                        0)
00425 
00426 
00427 ACE_FACTORY_DEFINE (TAO_Notify_Serv, TAO_CosNotify_Service)
00428 
00429 /*********************************************************************************************************************/

Generated on Sun Jan 27 15:39:53 2008 for TAO_CosNotification by doxygen 1.3.6