00001
00002
00003 #include "orbsvcs/Notify/CosNotify_Service.h"
00004 #include "orbsvcs/Notify/Properties.h"
00005 #include "orbsvcs/Notify/Default_Factory.h"
00006 #include "orbsvcs/Notify/Builder.h"
00007 #include "orbsvcs/Notify/EventChannel.h"
00008 #include "ace/Sched_Params.h"
00009 #include "ace/Arg_Shifter.h"
00010 #include "ace/Dynamic_Service.h"
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/NotifyExtC.h"
00013 #include "tao/debug.h"
00014
00015 ACE_RCSID (Notify,
00016 TAO_CosNotify_Service,
00017 "$Id: CosNotify_Service.cpp 79227 2007-08-06 13:52:47Z elliott_c $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 TAO_CosNotify_Service::TAO_CosNotify_Service (void)
00022 {
00023 }
00024
00025 TAO_CosNotify_Service::~TAO_CosNotify_Service (void)
00026 {
00027 }
00028
00029 int
00030 TAO_CosNotify_Service::init (int argc, ACE_TCHAR *argv[])
00031 {
00032 ACE_Arg_Shifter arg_shifter (argc, argv);
00033
00034 const ACE_TCHAR *current_arg = 0;
00035
00036
00037 int ec_threads = 0;
00038 int consumer_threads = 0;
00039 int supplier_threads = 0;
00040
00041 int task_per_proxy = 0;
00042
00043 TAO_Notify_Properties *properties = TAO_Notify_PROPERTIES::instance();
00044
00045 while (arg_shifter.is_anything_left ())
00046 {
00047 if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTDispatching")) == 0)
00048 {
00049 arg_shifter.consume_arg ();
00050 ACE_DEBUG ((LM_DEBUG,
00051 ACE_TEXT ("(%P|%t) The -MTDispatching option has been deprecated, use -DispatchingThreads \n")));
00052 }
00053 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-DispatchingThreads"))))
00054 {
00055 consumer_threads += ACE_OS::atoi (current_arg);
00056 arg_shifter.consume_arg ();
00057 }
00058 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTSourceEval")) == 0)
00059 {
00060 arg_shifter.consume_arg ();
00061 ACE_DEBUG ((LM_DEBUG,
00062 ACE_TEXT ("(%P|%t) The -MTSourceEval option has been deprecated, use -SourceThreads \n")));
00063 }
00064 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-SourceThreads"))))
00065 {
00066 supplier_threads += ACE_OS::atoi (current_arg);
00067 arg_shifter.consume_arg ();
00068 }
00069 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTLookup")) == 0)
00070 {
00071 arg_shifter.consume_arg ();
00072 ACE_DEBUG ((LM_DEBUG,
00073 ACE_TEXT ("(%P|%t) The -MTLookup option has been deprecated, use -SourceThreads \n")));
00074 }
00075 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LookupThreads"))))
00076 {
00077 supplier_threads += ACE_OS::atoi (current_arg);
00078 arg_shifter.consume_arg ();
00079 ACE_DEBUG ((LM_DEBUG,
00080 ACE_TEXT ("(%P|%t) The -LookupThreads option has been deprecated, use -SourceThreads \n")));
00081 }
00082 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTListenerEval")) == 0)
00083 {
00084 arg_shifter.consume_arg ();
00085 ACE_DEBUG ((LM_DEBUG,
00086 ACE_TEXT ("(%P|%t) The -MTListenerEval option has been deprecated, use -DispatchingThreads \n")));
00087 }
00088 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-ListenerThreads"))))
00089 {
00090
00091
00092 ACE_DEBUG ((LM_DEBUG,
00093 ACE_TEXT ("(%P|%t) The -ListenerThreads option has been deprecated, use -DispatchingThreads \n")));
00094 consumer_threads += ACE_OS::atoi (current_arg);
00095 arg_shifter.consume_arg ();
00096 }
00097 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AsynchUpdates")) == 0)
00098 {
00099 arg_shifter.consume_arg ();
00100
00101 properties->asynch_updates (1);
00102 }
00103 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-NoUpdates")) == 0)
00104 {
00105 arg_shifter.consume_arg ();
00106
00107 properties->updates (0);
00108 }
00109 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllocateTaskperProxy")) == 0)
00110 {
00111 task_per_proxy = 1;
00112 arg_shifter.consume_arg ();
00113 }
00114 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0)
00115 {
00116 current_arg = arg_shifter.get_the_parameter
00117 (ACE_TEXT("-UseSeparateDispatchingORB"));
00118 if (current_arg != 0 &&
00119 (ACE_OS::strcmp(ACE_TEXT ("0"), current_arg) == 0 ||
00120 ACE_OS::strcmp(ACE_TEXT ("1"), current_arg) == 0))
00121 {
00122 properties->separate_dispatching_orb (
00123 static_cast<bool> (ACE_OS::atoi(current_arg)));
00124 ACE_DEBUG ((LM_DEBUG,
00125 ACE_TEXT ("Using separate Dispatching ORB\n")));
00126 }
00127 else
00128 {
00129 ACE_DEBUG ((LM_DEBUG,
00130 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00131 ACE_TEXT ("argument (%s). Ignoring invalid ")
00132 ACE_TEXT ("-UseSeparateDispatchingORB usage.\n"),
00133 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00134 }
00135 if (current_arg != 0)
00136 arg_shifter.consume_arg ();
00137 }
00138 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllowReconnect")) == 0)
00139 {
00140 arg_shifter.consume_arg ();
00141 TAO_Notify_PROPERTIES::instance()->allow_reconnect (true);
00142 }
00143 else
00144 {
00145 ACE_ERROR ((LM_ERROR,
00146 ACE_TEXT ("(%P|%t) Ignoring unknown option for Notify Factory: %s\n"),
00147 arg_shifter.get_current()
00148 ));
00149 arg_shifter.consume_arg ();
00150 }
00151 }
00152
00153
00154 {
00155 CosNotification::QoSProperties qos;
00156 this->set_threads (qos, ec_threads);
00157 properties->default_event_channel_qos_properties (qos);
00158 }
00159
00160 if (task_per_proxy == 0)
00161 {
00162
00163 {
00164 if (consumer_threads > 0)
00165 ACE_DEBUG((LM_DEBUG, "Using %d threads for each ConsumerAdmin.\n", consumer_threads));
00166 CosNotification::QoSProperties qos;
00167 this->set_threads (qos, consumer_threads);
00168 properties->default_consumer_admin_qos_properties (qos);
00169 }
00170
00171
00172 {
00173 if (supplier_threads > 0)
00174 ACE_DEBUG((LM_DEBUG, "Using %d threads for each SupplierAdmin.\n", supplier_threads));
00175 CosNotification::QoSProperties qos;
00176 this->set_threads (qos, supplier_threads);
00177 properties->default_supplier_admin_qos_properties (qos);
00178 }
00179 }
00180 else
00181 {
00182
00183 {
00184 if (supplier_threads > 0)
00185 ACE_DEBUG((LM_DEBUG, "Using %d threads for each Supplier.\n", supplier_threads));
00186 CosNotification::QoSProperties qos;
00187 this->set_threads (qos, supplier_threads);
00188 properties->default_proxy_consumer_qos_properties (qos);
00189 }
00190
00191
00192 {
00193 if (consumer_threads > 0)
00194 ACE_DEBUG((LM_DEBUG, "Using %d threads for each Consumer.\n", consumer_threads));
00195 CosNotification::QoSProperties qos;
00196 this->set_threads (qos, consumer_threads);
00197 properties->default_proxy_supplier_qos_properties (qos);
00198 }
00199 }
00200
00201 return 0;
00202 }
00203
00204 void
00205 TAO_CosNotify_Service::set_threads (CosNotification::QoSProperties &qos, int threads)
00206 {
00207 NotifyExt::ThreadPoolParams tp_params =
00208 {NotifyExt::CLIENT_PROPAGATED, 0, 0, (unsigned)threads, 0, 0, 0, 0, 0 };
00209
00210 qos.length (1);
00211 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
00212 qos[0].value <<= tp_params;
00213 }
00214
00215 int
00216 TAO_CosNotify_Service::fini (void)
00217 {
00218 return 0;
00219 }
00220
00221 void
00222 TAO_CosNotify_Service::init_service (CORBA::ORB_ptr orb)
00223 {
00224 ACE_DEBUG ((LM_DEBUG, "Loading the Cos Notification Service...\n"));
00225
00226 if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb())
00227 {
00228
00229 if (0 == TAO_Notify_PROPERTIES::instance()->dispatching_orb())
00230 {
00231 ACE_DEBUG ((LM_DEBUG, "No dispatching orb supplied. Creating default one.\n"));
00232
00233 int argc = 0;
00234 char *argv0 = 0;
00235 char **argv = &argv0;
00236 CORBA::ORB_var dispatcher = CORBA::ORB_init (argc, argv,
00237 "default_dispatcher");
00238
00239 TAO_Notify_PROPERTIES::instance()->dispatching_orb(dispatcher.in());
00240 }
00241
00242 this->init_i2 (orb, TAO_Notify_PROPERTIES::instance()->dispatching_orb());
00243
00244 }
00245 else
00246 {
00247 this->init_i (orb);
00248 }
00249 }
00250
00251 void
00252 TAO_CosNotify_Service::init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00253 {
00254 this->init_i2 (orb, dispatching_orb);
00255 }
00256
00257 void
00258 TAO_CosNotify_Service::finalize_service (
00259 CosNotifyChannelAdmin::EventChannelFactory_ptr factory)
00260 {
00261
00262 if (CORBA::is_nil (factory))
00263 return;
00264
00265
00266 CosNotifyChannelAdmin::EventChannelFactory_var ecf =
00267 CosNotifyChannelAdmin::EventChannelFactory::_duplicate (factory);
00268
00269
00270 CosNotifyChannelAdmin::ChannelIDSeq_var channels =
00271 ecf->get_all_channels ();
00272 CORBA::ULong length = channels->length ();
00273 for(CORBA::ULong i = 0; i < length; i++)
00274 {
00275 try
00276 {
00277 CosNotifyChannelAdmin::EventChannel_var ec =
00278 ecf->get_event_channel (channels[i]);
00279 if (!CORBA::is_nil (ec.in ()))
00280 {
00281 TAO_Notify_EventChannel* nec =
00282 dynamic_cast<TAO_Notify_EventChannel*> (ec->_servant ());
00283 if (nec != 0)
00284 nec->destroy ();
00285 }
00286 }
00287 catch (const CORBA::Exception&)
00288 {
00289
00290 }
00291 }
00292 }
00293
00294 void
00295 TAO_CosNotify_Service::init_i (CORBA::ORB_ptr orb)
00296 {
00297
00298 CORBA::Object_var object =
00299 orb->resolve_initial_references("RootPOA");
00300
00301 if (CORBA::is_nil (object.in ()))
00302 ACE_ERROR ((LM_ERROR,
00303 " (%P|%t) Unable to resolve the RootPOA.\n"));
00304
00305 PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00306
00307
00308 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00309
00310 properties->orb (orb);
00311 properties->default_poa (default_poa.in ());
00312
00313
00314 this->factory_.reset (this->create_factory ());
00315 ACE_ASSERT( this->factory_.get() != 0 );
00316 TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00317
00318 this->builder_.reset (this->create_builder ());
00319 ACE_ASSERT( this->builder_.get() != 0 );
00320 TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00321 }
00322
00323 void
00324 TAO_CosNotify_Service::init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00325 {
00326
00327 CORBA::Object_var object =
00328 orb->resolve_initial_references("RootPOA");
00329
00330 if (CORBA::is_nil (object.in ()))
00331 ACE_ERROR ((LM_ERROR, " (%P|%t) Unable to resolve the RootPOA.\n"));
00332
00333 PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00334
00335
00336 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00337
00338 properties->orb (orb);
00339 properties->dispatching_orb (dispatching_orb);
00340 properties->separate_dispatching_orb (true);
00341
00342 properties->default_poa (default_poa.in ());
00343
00344
00345 this->factory_.reset (this->create_factory ());
00346 ACE_ASSERT( this->factory_.get() != 0 );
00347 TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00348
00349 this->builder_.reset (this->create_builder ());
00350 ACE_ASSERT( this->builder_.get() != 0 );
00351 TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00352 }
00353
00354 TAO_Notify_Factory*
00355 TAO_CosNotify_Service::create_factory (void)
00356 {
00357 TAO_Notify_Factory* factory = ACE_Dynamic_Service<TAO_Notify_Factory>::instance ("TAO_Notify_Factory");
00358 if (factory == 0)
00359 {
00360 ACE_NEW_THROW_EX (factory,
00361 TAO_Notify_Default_Factory (),
00362 CORBA::NO_MEMORY ());
00363 }
00364 return factory;
00365 }
00366
00367 TAO_Notify_Builder*
00368 TAO_CosNotify_Service::create_builder (void)
00369 {
00370 TAO_Notify_Builder* builder = 0;
00371 ACE_NEW_THROW_EX (builder,
00372 TAO_Notify_Builder (),
00373 CORBA::NO_MEMORY ());
00374
00375 return builder;
00376 }
00377
00378 CosNotifyChannelAdmin::EventChannelFactory_ptr
00379 TAO_CosNotify_Service::create (PortableServer::POA_ptr poa,
00380 const char* factory_name)
00381 {
00382 return this->builder().build_event_channel_factory (poa, factory_name);
00383 }
00384
00385 void
00386 TAO_CosNotify_Service::remove (TAO_Notify_EventChannelFactory* )
00387 {
00388
00389 }
00390
00391 TAO_Notify_Factory&
00392 TAO_CosNotify_Service::factory (void)
00393 {
00394 ACE_ASSERT( this->factory_.get() != 0 );
00395 return *this->factory_;
00396 }
00397
00398 TAO_Notify_Builder&
00399 TAO_CosNotify_Service::builder (void)
00400 {
00401 ACE_ASSERT( this->builder_.get() != 0 );
00402 return *this->builder_;
00403 }
00404
00405 TAO_END_VERSIONED_NAMESPACE_DECL
00406
00407
00408
00409
00410 ACE_STATIC_SVC_DEFINE (TAO_Notify_Default_EMO_Factory_OLD,
00411 ACE_TEXT (TAO_NOTIFY_DEF_EMO_FACTORY_NAME),
00412 ACE_SVC_OBJ_T,
00413 &ACE_SVC_NAME (TAO_CosNotify_Service),
00414 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00415 0)
00416
00417
00418
00419 ACE_STATIC_SVC_DEFINE (TAO_CosNotify_Service,
00420 ACE_TEXT (TAO_COS_NOTIFICATION_SERVICE_NAME),
00421 ACE_SVC_OBJ_T,
00422 &ACE_SVC_NAME (TAO_CosNotify_Service),
00423 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00424 0)
00425
00426
00427 ACE_FACTORY_DEFINE (TAO_Notify_Serv, TAO_CosNotify_Service)
00428
00429