00001
00002
00003 #include "orbsvcs/Notify/CosNotify_Service.h"
00004 #include "orbsvcs/Notify/Properties.h"
00005 #include "orbsvcs/Notify/Default_Factory.h"
00006 #include "orbsvcs/Notify/Builder.h"
00007 #include "orbsvcs/Notify/EventChannel.h"
00008 #include "ace/Sched_Params.h"
00009 #include "ace/Arg_Shifter.h"
00010 #include "ace/Dynamic_Service.h"
00011 #include "tao/ORB_Core.h"
00012 #include "orbsvcs/NotifyExtC.h"
00013 #include "tao/debug.h"
00014
00015 ACE_RCSID (Notify,
00016 TAO_CosNotify_Service,
00017 "$Id: CosNotify_Service.cpp 85185 2009-04-28 08:53:12Z johnnyw $")
00018
00019 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 TAO_CosNotify_Service::TAO_CosNotify_Service (void)
00022 {
00023 }
00024
00025 TAO_CosNotify_Service::~TAO_CosNotify_Service (void)
00026 {
00027 }
00028
00029 int
00030 TAO_CosNotify_Service::init (int argc, ACE_TCHAR *argv[])
00031 {
00032 ACE_Arg_Shifter arg_shifter (argc, argv);
00033
00034 const ACE_TCHAR *current_arg = 0;
00035
00036
00037 int ec_threads = 0;
00038 int consumer_threads = 0;
00039 int supplier_threads = 0;
00040
00041 bool task_per_proxy = false;
00042
00043 TAO_Notify_Properties *properties = TAO_Notify_PROPERTIES::instance();
00044
00045 while (arg_shifter.is_anything_left ())
00046 {
00047 if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTDispatching")) == 0)
00048 {
00049 arg_shifter.consume_arg ();
00050 ACE_DEBUG ((LM_DEBUG,
00051 ACE_TEXT ("(%P|%t) The -MTDispatching option has been deprecated, use -DispatchingThreads\n")));
00052 }
00053 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-DispatchingThreads"))))
00054 {
00055 consumer_threads += ACE_OS::atoi (current_arg);
00056 arg_shifter.consume_arg ();
00057 }
00058 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTSourceEval")) == 0)
00059 {
00060 arg_shifter.consume_arg ();
00061 ACE_DEBUG ((LM_DEBUG,
00062 ACE_TEXT ("(%P|%t) The -MTSourceEval option has been deprecated, use -SourceThreads\n")));
00063 }
00064 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-SourceThreads"))))
00065 {
00066 supplier_threads += ACE_OS::atoi (current_arg);
00067 arg_shifter.consume_arg ();
00068 }
00069 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTLookup")) == 0)
00070 {
00071 arg_shifter.consume_arg ();
00072 ACE_DEBUG ((LM_DEBUG,
00073 ACE_TEXT ("(%P|%t) The -MTLookup option has been deprecated, use -SourceThreads\n")));
00074 }
00075 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LookupThreads"))))
00076 {
00077 supplier_threads += ACE_OS::atoi (current_arg);
00078 arg_shifter.consume_arg ();
00079 ACE_DEBUG ((LM_DEBUG,
00080 ACE_TEXT ("(%P|%t) The -LookupThreads option has been deprecated, use -SourceThreads\n")));
00081 }
00082 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-MTListenerEval")) == 0)
00083 {
00084 arg_shifter.consume_arg ();
00085 ACE_DEBUG ((LM_DEBUG,
00086 ACE_TEXT ("(%P|%t) The -MTListenerEval option has been deprecated, use -DispatchingThreads\n")));
00087 }
00088 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-ListenerThreads"))))
00089 {
00090
00091
00092 ACE_DEBUG ((LM_DEBUG,
00093 ACE_TEXT ("(%P|%t) The -ListenerThreads option has been deprecated, use -DispatchingThreads\n")));
00094 consumer_threads += ACE_OS::atoi (current_arg);
00095 arg_shifter.consume_arg ();
00096 }
00097 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AsynchUpdates")) == 0)
00098 {
00099 arg_shifter.consume_arg ();
00100
00101 properties->asynch_updates (1);
00102 }
00103 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-NoUpdates")) == 0)
00104 {
00105 arg_shifter.consume_arg ();
00106
00107 properties->updates (0);
00108 }
00109 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllocateTaskperProxy")) == 0)
00110 {
00111 task_per_proxy = true;
00112 arg_shifter.consume_arg ();
00113 }
00114 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0)
00115 {
00116 current_arg = arg_shifter.get_the_parameter
00117 (ACE_TEXT("-UseSeparateDispatchingORB"));
00118 if (current_arg != 0 &&
00119 (ACE_OS::strcmp(ACE_TEXT ("0"), current_arg) == 0 ||
00120 ACE_OS::strcmp(ACE_TEXT ("1"), current_arg) == 0))
00121 {
00122 properties->separate_dispatching_orb (
00123 static_cast<bool> (ACE_OS::atoi(current_arg)));
00124 ACE_DEBUG ((LM_DEBUG,
00125 ACE_TEXT ("Using separate Dispatching ORB\n")));
00126 }
00127 else
00128 {
00129 ACE_DEBUG ((LM_DEBUG,
00130 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00131 ACE_TEXT ("argument (%s). Ignoring invalid ")
00132 ACE_TEXT ("-UseSeparateDispatchingORB usage.\n"),
00133 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00134 }
00135 if (current_arg != 0)
00136 arg_shifter.consume_arg ();
00137 }
00138 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllowReconnect")) == 0)
00139 {
00140 arg_shifter.consume_arg ();
00141 TAO_Notify_PROPERTIES::instance()->allow_reconnect (true);
00142 }
00143 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-DefaultConsumerAdminFilterOp")) == 0)
00144 {
00145 current_arg = arg_shifter.get_the_parameter
00146 (ACE_TEXT("-DefaultConsumerAdminFilterOp"));
00147 CosNotifyChannelAdmin::InterFilterGroupOperator op = CosNotifyChannelAdmin::OR_OP;
00148 if (current_arg != 0 && (ACE_OS::strcmp(ACE_TEXT ("AND"), current_arg) == 0))
00149 op = CosNotifyChannelAdmin::AND_OP;
00150 else if (current_arg != 0 && (ACE_OS::strcmp(ACE_TEXT ("OR"), current_arg) == 0))
00151 op = CosNotifyChannelAdmin::OR_OP;
00152 else
00153 {
00154 ACE_DEBUG ((LM_DEBUG,
00155 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00156 ACE_TEXT ("argument (%s). Ignoring invalid ")
00157 ACE_TEXT ("-DefaultConsumerAdminFilterOp usage.\n"),
00158 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00159 }
00160 properties->defaultConsumerAdminFilterOp (op);
00161 arg_shifter.consume_arg ();
00162 }
00163 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-DefaultSupplierAdminFilterOp")) == 0)
00164 {
00165 current_arg = arg_shifter.get_the_parameter
00166 (ACE_TEXT("-DefaultSupplierAdminFilterOp"));
00167 CosNotifyChannelAdmin::InterFilterGroupOperator op = CosNotifyChannelAdmin::OR_OP;
00168 if (current_arg != 0 && (ACE_OS::strcmp(ACE_TEXT ("AND"), current_arg) == 0))
00169 op = CosNotifyChannelAdmin::AND_OP;
00170 else if (current_arg != 0 && (ACE_OS::strcmp(ACE_TEXT ("OR"), current_arg) == 0))
00171 op = CosNotifyChannelAdmin::OR_OP;
00172 else
00173 {
00174 ACE_DEBUG ((LM_DEBUG,
00175 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00176 ACE_TEXT ("argument (%s). Ignoring invalid ")
00177 ACE_TEXT ("-DefaultSupplierAdminFilterOp usage.\n"),
00178 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00179 }
00180 properties->defaultSupplierAdminFilterOp (op);
00181 arg_shifter.consume_arg ();
00182 }
00183 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-ValidateClient")) == 0)
00184 {
00185 arg_shifter.consume_arg ();
00186 TAO_Notify_PROPERTIES::instance()->validate_client (true);
00187 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Using reactive client control.\n")));
00188 }
00189 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-ValidateClientDelay")) == 0)
00190 {
00191 current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-ValidateClientDelay"));
00192 if (current_arg != 0)
00193 {
00194 ACE_Time_Value tv (ACE_OS::atoi (current_arg));
00195 TAO_Notify_PROPERTIES::instance()->validate_client_delay (tv);
00196 }
00197 else
00198 {
00199 ACE_DEBUG ((LM_DEBUG,
00200 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00201 ACE_TEXT ("argument (%s). Ignoring invalid ")
00202 ACE_TEXT ("-ValidateClientDelay usage.\n"),
00203 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00204 }
00205 if (current_arg != 0)
00206 arg_shifter.consume_arg ();
00207 }
00208 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-ValidateClientInterval")) == 0)
00209 {
00210 current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-ValidateClientInterval"));
00211 if (current_arg != 0)
00212 {
00213 ACE_Time_Value tv (ACE_OS::atoi (current_arg));
00214 TAO_Notify_PROPERTIES::instance()->validate_client_interval (tv);
00215 }
00216 else
00217 {
00218 ACE_DEBUG ((LM_DEBUG,
00219 ACE_TEXT ("(%P|%t) WARNING: Unrecognized ")
00220 ACE_TEXT ("argument (%s). Ignoring invalid ")
00221 ACE_TEXT ("-ValidateClientDelay usage.\n"),
00222 (current_arg == 0 ? ACE_TEXT ("''") : current_arg)));
00223 }
00224 if (current_arg != 0)
00225 arg_shifter.consume_arg ();
00226 }
00227 else
00228 {
00229 ACE_ERROR ((LM_ERROR,
00230 ACE_TEXT ("(%P|%t) Ignoring unknown option for Notify Factory: %s\n"),
00231 arg_shifter.get_current()
00232 ));
00233 arg_shifter.consume_arg ();
00234 }
00235 }
00236
00237
00238 {
00239 CosNotification::QoSProperties qos;
00240 this->set_threads (qos, ec_threads);
00241 properties->default_event_channel_qos_properties (qos);
00242 }
00243
00244 if (!task_per_proxy)
00245 {
00246
00247 {
00248 if (consumer_threads > 0)
00249 ACE_DEBUG((LM_DEBUG, "Using %d threads for each ConsumerAdmin.\n", consumer_threads));
00250 CosNotification::QoSProperties qos;
00251 this->set_threads (qos, consumer_threads);
00252 properties->default_consumer_admin_qos_properties (qos);
00253 }
00254
00255
00256 {
00257 if (supplier_threads > 0)
00258 ACE_DEBUG((LM_DEBUG, "Using %d threads for each SupplierAdmin.\n", supplier_threads));
00259 CosNotification::QoSProperties qos;
00260 this->set_threads (qos, supplier_threads);
00261 properties->default_supplier_admin_qos_properties (qos);
00262 }
00263 }
00264 else
00265 {
00266
00267 {
00268 if (supplier_threads > 0)
00269 ACE_DEBUG((LM_DEBUG, "Using %d threads for each Supplier.\n", supplier_threads));
00270 CosNotification::QoSProperties qos;
00271 this->set_threads (qos, supplier_threads);
00272 properties->default_proxy_consumer_qos_properties (qos);
00273 }
00274
00275
00276 {
00277 if (consumer_threads > 0)
00278 ACE_DEBUG((LM_DEBUG, "Using %d threads for each Consumer.\n", consumer_threads));
00279 CosNotification::QoSProperties qos;
00280 this->set_threads (qos, consumer_threads);
00281 properties->default_proxy_supplier_qos_properties (qos);
00282 }
00283 }
00284
00285 return 0;
00286 }
00287
00288 void
00289 TAO_CosNotify_Service::set_threads (CosNotification::QoSProperties &qos, int threads)
00290 {
00291 NotifyExt::ThreadPoolParams tp_params =
00292 {NotifyExt::CLIENT_PROPAGATED, 0, 0, (unsigned)threads, 0, 0, 0, 0, 0 };
00293
00294 qos.length (1);
00295 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
00296 qos[0].value <<= tp_params;
00297 }
00298
00299 int
00300 TAO_CosNotify_Service::fini (void)
00301 {
00302 if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb())
00303 {
00304 if (!CORBA::is_nil (TAO_Notify_PROPERTIES::instance()->dispatching_orb()))
00305 {
00306 CORBA::ORB_var dispatcher =
00307 TAO_Notify_PROPERTIES::instance()->dispatching_orb();
00308 dispatcher->shutdown ();
00309 dispatcher->destroy ();
00310 }
00311 }
00312
00313 TAO_Notify_Properties::instance()->close ();
00314 return 0;
00315 }
00316
00317 void
00318 TAO_CosNotify_Service::init_service (CORBA::ORB_ptr orb)
00319 {
00320 ACE_DEBUG ((LM_DEBUG, "Loading the Cos Notification Service...\n"));
00321
00322 if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb())
00323 {
00324
00325 if (CORBA::is_nil (TAO_Notify_PROPERTIES::instance()->dispatching_orb()))
00326 {
00327 ACE_DEBUG ((LM_DEBUG, "No dispatching orb supplied. Creating default one.\n"));
00328
00329 int argc = 0;
00330 ACE_TCHAR *argv0 = 0;
00331 ACE_TCHAR **argv = &argv0;
00332 CORBA::ORB_var dispatcher = CORBA::ORB_init (argc, argv,
00333 "default_dispatcher");
00334
00335 TAO_Notify_PROPERTIES::instance()->dispatching_orb(dispatcher.in());
00336 }
00337
00338 this->init_i2 (orb, TAO_Notify_PROPERTIES::instance()->dispatching_orb());
00339
00340 }
00341 else
00342 {
00343 this->init_i (orb);
00344 }
00345 }
00346
00347 void
00348 TAO_CosNotify_Service::init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00349 {
00350 this->init_i2 (orb, dispatching_orb);
00351 }
00352
00353 void
00354 TAO_CosNotify_Service::finalize_service (
00355 CosNotifyChannelAdmin::EventChannelFactory_ptr factory)
00356 {
00357
00358 if (CORBA::is_nil (factory))
00359 return;
00360
00361
00362 CosNotifyChannelAdmin::EventChannelFactory_var ecf =
00363 CosNotifyChannelAdmin::EventChannelFactory::_duplicate (factory);
00364
00365
00366 CosNotifyChannelAdmin::ChannelIDSeq_var channels =
00367 ecf->get_all_channels ();
00368 CORBA::ULong length = channels->length ();
00369 for(CORBA::ULong i = 0; i < length; i++)
00370 {
00371 try
00372 {
00373 CosNotifyChannelAdmin::EventChannel_var ec =
00374 ecf->get_event_channel (channels[i]);
00375 if (!CORBA::is_nil (ec.in ()))
00376 {
00377 TAO_Notify_EventChannel* nec =
00378 dynamic_cast<TAO_Notify_EventChannel*> (ec->_servant ());
00379 if (nec != 0)
00380 nec->destroy ();
00381 }
00382 }
00383 catch (const CORBA::Exception&)
00384 {
00385
00386 }
00387 }
00388
00389 TAO_Notify_EventChannelFactory* necf =
00390 dynamic_cast<TAO_Notify_EventChannelFactory*> (ecf->_servant ());
00391
00392 if (necf != 0)
00393 necf->stop_validator();
00394 }
00395
00396 void
00397 TAO_CosNotify_Service::init_i (CORBA::ORB_ptr orb)
00398 {
00399
00400 CORBA::Object_var object =
00401 orb->resolve_initial_references("RootPOA");
00402
00403 if (CORBA::is_nil (object.in ()))
00404 ACE_ERROR ((LM_ERROR,
00405 " (%P|%t) Unable to resolve the RootPOA.\n"));
00406
00407 PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00408
00409
00410 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00411
00412 properties->orb (orb);
00413 properties->default_poa (default_poa.in ());
00414
00415
00416 this->factory_.reset (this->create_factory ());
00417 ACE_ASSERT( this->factory_.get() != 0 );
00418 TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00419
00420 this->builder_.reset (this->create_builder ());
00421 ACE_ASSERT( this->builder_.get() != 0 );
00422 TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00423 }
00424
00425 void
00426 TAO_CosNotify_Service::init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb)
00427 {
00428
00429 CORBA::Object_var object =
00430 orb->resolve_initial_references("RootPOA");
00431
00432 if (CORBA::is_nil (object.in ()))
00433 ACE_ERROR ((LM_ERROR, " (%P|%t) Unable to resolve the RootPOA.\n"));
00434
00435 PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in ());
00436
00437
00438 TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
00439
00440 properties->orb (orb);
00441 properties->dispatching_orb (dispatching_orb);
00442 properties->separate_dispatching_orb (true);
00443
00444 properties->default_poa (default_poa.in ());
00445
00446
00447 this->factory_.reset (this->create_factory ());
00448 ACE_ASSERT( this->factory_.get() != 0 );
00449 TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
00450
00451 this->builder_.reset (this->create_builder ());
00452 ACE_ASSERT( this->builder_.get() != 0 );
00453 TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
00454 }
00455
00456 TAO_Notify_Factory*
00457 TAO_CosNotify_Service::create_factory (void)
00458 {
00459 TAO_Notify_Factory* factory = ACE_Dynamic_Service<TAO_Notify_Factory>::instance ("TAO_Notify_Factory");
00460 if (factory == 0)
00461 {
00462 ACE_NEW_THROW_EX (factory,
00463 TAO_Notify_Default_Factory (),
00464 CORBA::NO_MEMORY ());
00465 }
00466 return factory;
00467 }
00468
00469 TAO_Notify_Builder*
00470 TAO_CosNotify_Service::create_builder (void)
00471 {
00472 TAO_Notify_Builder* builder = 0;
00473 ACE_NEW_THROW_EX (builder,
00474 TAO_Notify_Builder (),
00475 CORBA::NO_MEMORY ());
00476
00477 return builder;
00478 }
00479
00480 CosNotifyChannelAdmin::EventChannelFactory_ptr
00481 TAO_CosNotify_Service::create (PortableServer::POA_ptr poa,
00482 const char* factory_name)
00483 {
00484 return this->builder().build_event_channel_factory (poa, factory_name);
00485 }
00486
00487 void
00488 TAO_CosNotify_Service::remove (TAO_Notify_EventChannelFactory* )
00489 {
00490
00491 }
00492
00493 TAO_Notify_Factory&
00494 TAO_CosNotify_Service::factory (void)
00495 {
00496 ACE_ASSERT( this->factory_.get() != 0 );
00497 return *this->factory_;
00498 }
00499
00500 TAO_Notify_Builder&
00501 TAO_CosNotify_Service::builder (void)
00502 {
00503 ACE_ASSERT( this->builder_.get() != 0 );
00504 return *this->builder_;
00505 }
00506
00507 TAO_END_VERSIONED_NAMESPACE_DECL
00508
00509
00510
00511 ACE_STATIC_SVC_DEFINE (TAO_Notify_Default_EMO_Factory_OLD,
00512 ACE_TEXT (TAO_NOTIFY_DEF_EMO_FACTORY_NAME),
00513 ACE_SVC_OBJ_T,
00514 &ACE_SVC_NAME (TAO_CosNotify_Service),
00515 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00516 0)
00517
00518
00519
00520 ACE_STATIC_SVC_DEFINE (TAO_CosNotify_Service,
00521 ACE_TEXT (TAO_COS_NOTIFICATION_SERVICE_NAME),
00522 ACE_SVC_OBJ_T,
00523 &ACE_SVC_NAME (TAO_CosNotify_Service),
00524 ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ,
00525 0)
00526
00527
00528 ACE_FACTORY_DEFINE (TAO_Notify_Serv, TAO_CosNotify_Service)
00529
00530