00001
00002
00003 #include "ImR_Locator_i.h"
00004 #include "utils.h"
00005 #include "Iterator.h"
00006 #include "INS_Locator.h"
00007
00008 #include "orbsvcs/Time_Utilities.h"
00009
00010 #include "tao/IORTable/IORTable.h"
00011 #include "tao/PortableServer/PortableServer.h"
00012 #include "tao/ORB_Core.h"
00013 #include "tao/default_ports.h"
00014 #include "tao/Messaging/Messaging.h"
00015 #include "tao/AnyTypeCode/Any.h"
00016
00017 #include "ace/ARGV.h"
00018 #include "ace/OS_NS_sys_time.h"
00019 #include "ace/Vector_T.h"
00020
00021 static const int DEFAULT_START_LIMIT = 1;
00022
00023 static const int PING_RETRY_SCHEDULE[] = {0, 10, 100, 500, 1000, 1000, 1000, 1000, 5000, 5000};
00024
00025 static const ACE_Time_Value DEFAULT_SERVER_TIMEOUT (0, 10 * 1000);
00026
00027
00028
00029
00030 static const ACE_Time_Value DEFAULT_SHUTDOWN_TIMEOUT (0, 5000 * 1000);
00031
00032 static PortableServer::POA_ptr
00033 createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) {
00034
00035 PortableServer::LifespanPolicy_var life =
00036 root_poa->create_lifespan_policy (PortableServer::PERSISTENT);
00037
00038 PortableServer::IdAssignmentPolicy_var assign =
00039 root_poa->create_id_assignment_policy (PortableServer::USER_ID);
00040
00041 CORBA::PolicyList pols;
00042 pols.length (2);
00043 pols[0] = PortableServer::LifespanPolicy::_duplicate (life.in ());
00044 pols[1] = PortableServer::IdAssignmentPolicy::_duplicate (assign.in ());
00045
00046 PortableServer::POAManager_var mgr = root_poa->the_POAManager ();
00047 PortableServer::POA_var poa =
00048 root_poa->create_POA (poa_name, mgr.in (), pols);
00049
00050 life->destroy ();
00051 assign->destroy ();
00052
00053 return poa._retn ();
00054 }
00055
00056 ImR_Locator_i::ImR_Locator_i (void)
00057 : forwarder_ (*this)
00058 , ins_locator_ (0)
00059 , debug_ (0)
00060 , read_only_ (false)
00061 {
00062
00063
00064
00065 INS_Locator* locator;
00066 ACE_NEW (locator,
00067 INS_Locator (*this));
00068 ins_locator_ = locator;
00069 }
00070
00071 ImR_Locator_i::~ImR_Locator_i (void)
00072 {
00073
00074
00075
00076
00077
00078 }
00079
00080 int
00081 ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts)
00082 {
00083 orb_ = CORBA::ORB::_duplicate (orb);
00084 debug_ = opts.debug ();
00085 read_only_ = opts.readonly ();
00086 startup_timeout_ = opts.startup_timeout ();
00087 ping_interval_ = opts.ping_interval ();
00088
00089 CORBA::Object_var obj =
00090 this->orb_->resolve_initial_references ("RootPOA");
00091 this->root_poa_ = PortableServer::POA::_narrow (obj.in ());
00092 ACE_ASSERT (! CORBA::is_nil (this->root_poa_.in ()));
00093
00094 this->forwarder_.init (orb);
00095 this->adapter_.init (& this->forwarder_);
00096
00097
00098
00099 root_poa_->the_activator (&this->adapter_);
00100
00101
00102 this->imr_poa_ = createPersistentPOA (this->root_poa_.in (),
00103 "ImplRepo_Service");
00104 ACE_ASSERT (! CORBA::is_nil (this->imr_poa_.in ()));
00105
00106 waiter_svt_.debug (debug_ > 1);
00107 PortableServer::ObjectId_var id = PortableServer::string_to_ObjectId ("ImR_AsyncStartupWaiter");
00108 this->imr_poa_->activate_object_with_id (id.in (), &waiter_svt_);
00109 obj = this->imr_poa_->id_to_reference (id.in ());
00110 if (startup_timeout_ > ACE_Time_Value::zero)
00111 {
00112 obj = set_timeout_policy (obj.in (), startup_timeout_);
00113 }
00114 waiter_ = ImplementationRepository::AsyncStartupWaiter::_narrow (obj.in ());
00115
00116 id = PortableServer::string_to_ObjectId ("ImplRepo_Service");
00117 this->imr_poa_->activate_object_with_id (id.in (), this);
00118
00119 obj = this->imr_poa_->id_to_reference (id.in ());
00120 CORBA::String_var ior = this->orb_->object_to_string (obj.in ());
00121
00122
00123 obj = orb->resolve_initial_references ("IORTable");
00124 IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ());
00125 ACE_ASSERT (! CORBA::is_nil (ior_table.in ()));
00126 ior_table->bind ("ImplRepoService", ior.in ());
00127 ior_table->bind ("ImR", ior.in ());
00128 ior_table->set_locator (this->ins_locator_.in ());
00129
00130
00131 if (opts.multicast ())
00132 {
00133 ACE_Reactor* reactor = orb->orb_core ()->reactor ();
00134 if (this->setup_multicast (reactor, ior.in ()) != 0)
00135 return -1;
00136 }
00137
00138
00139
00140
00141
00142 int init_result =
00143 this->repository_.init (opts);
00144 if (init_result == -1)
00145 {
00146 ACE_ERROR_RETURN ((LM_ERROR, "Repository failed to initialize\n"), -1);
00147 }
00148
00149
00150 PortableServer::POAManager_var poaman =
00151 this->root_poa_->the_POAManager ();
00152 poaman->activate ();
00153 poaman = this->imr_poa_->the_POAManager ();
00154 poaman->activate ();
00155
00156
00157 if (opts.ior_filename ().length () > 0)
00158 {
00159 FILE* fp = ACE_OS::fopen (opts.ior_filename ().c_str (), "w");
00160 if (fp == 0)
00161 {
00162 ACE_ERROR_RETURN ((LM_ERROR,
00163 "ImR: Could not open file: %s\n", opts.ior_filename ().c_str ()), -1);
00164 }
00165 ACE_OS::fprintf (fp, "%s", ior.in ());
00166 ACE_OS::fclose (fp);
00167 }
00168
00169 return 0;
00170 }
00171
00172 int
00173 ImR_Locator_i::init (Options& opts)
00174 {
00175 ACE_CString cmdline = opts.cmdline ();
00176 cmdline += " -orbcollocation no -orbuseimr 0";
00177 ACE_ARGV av (cmdline.c_str ());
00178 int argc = av.argc ();
00179 char** argv = av.argv ();
00180
00181 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "TAO_ImR_Locator");
00182 int err = this->init_with_orb (orb.in (), opts);
00183 return err;
00184 }
00185
00186 int
00187 ImR_Locator_i::run (void)
00188 {
00189 if (debug_ > 0)
00190 {
00191
00192
00193
00194
00195
00196 ACE_DEBUG ((LM_DEBUG,
00197 "Implementation Repository: Running\n"
00198 "\tPing Interval : %dms\n"
00199 "\tStartup Timeout : %ds\n"
00200 "\tPersistence : %s\n"
00201 "\tMulticast : %s\n",
00202 ping_interval_.msec (),
00203 startup_timeout_.sec (),
00204 repository_.repo_mode (),
00205 ior_multicast_.reactor () != 0 ? "Enabled" : "Disabled"));
00206 ACE_DEBUG ((LM_DEBUG,
00207 "\tDebug : %d\n"
00208 "\tLocked : %s\n\n",
00209 debug (),
00210 read_only_ ? "True" : "False"));
00211 }
00212 this->auto_start_servers ();
00213
00214 this->orb_->run ();
00215 return 0;
00216 }
00217
00218 void
00219 ImR_Locator_i::shutdown (CORBA::Boolean activators, CORBA::Boolean servers)
00220 {
00221 if (servers != 0 && this->repository_.servers ().current_size () > 0)
00222 {
00223
00224 ACE_ERROR ((LM_ERROR, "ImR: Shutdown of all servers not implemented.\n"));
00225 }
00226 if (activators != 0 && this->repository_.activators ().current_size () > 0)
00227 {
00228 ACE_Vector<ImplementationRepository::Activator_var> acts;
00229 Locator_Repository::AIMap::ENTRY* entry = 0;
00230 Locator_Repository::AIMap::ITERATOR it (this->repository_.activators ());
00231 for (;it.next (entry) != 0; it.advance ())
00232 {
00233 Activator_Info_Ptr info = entry->int_id_;
00234 ACE_ASSERT (! info.null ());
00235 connect_activator (*info);
00236 if (! CORBA::is_nil (info->activator.in ()))
00237 acts.push_back (info->activator);
00238 }
00239
00240 int shutdown_errs = 0;
00241
00242 for (size_t i = 0; i < acts.size (); ++i)
00243 {
00244 try
00245 {
00246 acts[i]->shutdown ();
00247 acts[i] = ImplementationRepository::Activator::_nil ();
00248 }
00249 catch (const CORBA::Exception& ex)
00250 {
00251 ++shutdown_errs;
00252 if (debug_ > 1)
00253 {
00254 ex._tao_print_exception (
00255 "ImR: shutdown activator");
00256 }
00257 }
00258 }
00259 if (debug_ > 0 && shutdown_errs > 0)
00260 {
00261 ACE_DEBUG ((LM_DEBUG, "ImR: Some activators could not be shut down.\n"));
00262 }
00263 }
00264
00265
00266 this->shutdown (false);
00267 }
00268
00269 void
00270 ImR_Locator_i::shutdown (bool wait_for_completion)
00271 {
00272 this->orb_->shutdown (wait_for_completion);
00273 }
00274
00275 int
00276 ImR_Locator_i::fini (void)
00277 {
00278 try
00279 {
00280 if (debug_ > 1)
00281 ACE_DEBUG ((LM_DEBUG, "ImR: Shutting down...\n"));
00282
00283 teardown_multicast ();
00284
00285 this->root_poa_->destroy (1, 1);
00286
00287 this->orb_->destroy ();
00288
00289 if (debug_ > 0)
00290 ACE_DEBUG ((LM_DEBUG, "ImR: Shut down successfully.\n"));
00291 }
00292 catch (const CORBA::Exception& ex)
00293 {
00294 ex._tao_print_exception ("ImR_Locator_i::fini");
00295 throw;
00296 }
00297 return 0;
00298 }
00299
00300 void
00301 ImR_Locator_i::teardown_multicast ()
00302 {
00303 ACE_Reactor* r = ior_multicast_.reactor ();
00304 if (r != 0) {
00305 r->remove_handler (&ior_multicast_, ACE_Event_Handler::READ_MASK);
00306 ior_multicast_.reactor (0);
00307 }
00308 }
00309
00310 int
00311 ImR_Locator_i::setup_multicast (ACE_Reactor* reactor, const char* ior)
00312 {
00313 ACE_ASSERT (reactor != 0);
00314 ACE_ASSERT (ior != 0);
00315 #if defined (ACE_HAS_IP_MULTICAST)
00316
00317 TAO_ORB_Core* core = TAO_ORB_Core_instance ();
00318
00319 ACE_CString mde (core->orb_params ()->mcast_discovery_endpoint ());
00320
00321 if (mde.length () != 0)
00322 {
00323 if (this->ior_multicast_.init (ior,
00324 mde.c_str (), TAO_SERVICEID_IMPLREPOSERVICE) == -1)
00325 {
00326 return -1;
00327 }
00328 }
00329 else
00330 {
00331
00332 CORBA::UShort port =
00333 core->orb_params ()->service_port (TAO::MCAST_IMPLREPOSERVICE);
00334 if (port == 0)
00335 {
00336
00337 const char* port_number = ACE_OS::getenv ("ImplRepoServicePort");
00338
00339 if (port_number != 0)
00340 port = static_cast<CORBA::UShort> (ACE_OS::atoi (port_number));
00341 }
00342 if (port == 0)
00343 port = TAO_DEFAULT_IMPLREPO_SERVER_REQUEST_PORT;
00344
00345 if (this->ior_multicast_.init (ior, port,
00346 ACE_DEFAULT_MULTICAST_ADDR, TAO_SERVICEID_IMPLREPOSERVICE) == -1)
00347 {
00348 return -1;
00349 }
00350 }
00351
00352
00353 if (reactor->register_handler (&this->ior_multicast_,
00354 ACE_Event_Handler::READ_MASK) == -1)
00355 {
00356 if (debug_ >= 1)
00357 ACE_DEBUG ((LM_DEBUG, "ImR: cannot register Event handler\n"));
00358 return -1;
00359 }
00360 #else
00361 ACE_UNUSED_ARG (reactor);
00362 ACE_UNUSED_ARG (ior);
00363 #endif
00364 return 0;
00365 }
00366
00367 CORBA::Long
00368 ImR_Locator_i::register_activator (const char* aname,
00369 ImplementationRepository::Activator_ptr activator)
00370 {
00371 ACE_ASSERT (aname != 0);
00372 ACE_ASSERT (! CORBA::is_nil (activator));
00373
00374
00375
00376 this->unregister_activator_i (aname);
00377
00378 CORBA::String_var ior =
00379 this->orb_->object_to_string (activator);
00380
00381 CORBA::Long token = ACE_OS::gettimeofday ().msec ();
00382
00383 int err = this->repository_.add_activator (aname, token, ior.in (), activator);
00384 ACE_ASSERT (err == 0);
00385 ACE_UNUSED_ARG (err);
00386
00387 if (this->debug_ > 0)
00388 ACE_DEBUG ((LM_DEBUG, "ImR: Activator registered for %s.\n", aname));
00389
00390 return token;
00391 }
00392
00393 void
00394 ImR_Locator_i::unregister_activator (const char* aname,
00395 CORBA::Long token)
00396 {
00397 ACE_ASSERT (aname != 0);
00398 Activator_Info_Ptr info = this->get_activator (aname);
00399
00400 if (! info.null ())
00401 {
00402 if (info->token != token && this->debug_ > 0)
00403 {
00404 ACE_DEBUG ((LM_DEBUG, "ImR: Ignoring unregister activator:%s. Wrong token.\n", aname));
00405 return;
00406 }
00407
00408 this->unregister_activator_i (aname);
00409
00410 if (this->debug_ > 0)
00411 ACE_DEBUG ((LM_DEBUG, "ImR: Activator %s unregistered.\n", aname));
00412 }
00413 else
00414 {
00415 if (this->debug_ > 0)
00416 ACE_DEBUG ((LM_DEBUG, "ImR: Ignoring unregister activator:%s. Unknown activator.\n", aname));
00417 }
00418 }
00419
00420 void
00421 ImR_Locator_i::unregister_activator_i (const char* aname)
00422 {
00423 ACE_ASSERT (aname != 0);
00424 int err = this->repository_.remove_activator (aname);
00425 ACE_UNUSED_ARG (err);
00426 }
00427
00428 void
00429 ImR_Locator_i::notify_child_death (const char* name)
00430 {
00431 ACE_ASSERT (name != 0);
00432
00433 if (this->debug_ > 1)
00434 ACE_DEBUG ((LM_DEBUG, "ImR: Server has died <%s>.\n", name));
00435
00436 Server_Info_Ptr info = this->repository_.get_server (name);
00437 if (! info.null ())
00438 {
00439 info->ior = "";
00440 info->partial_ior = "";
00441
00442 int err = this->repository_.update_server (*info);
00443 ACE_ASSERT (err == 0);
00444 ACE_UNUSED_ARG (err);
00445 }
00446 else
00447 {
00448 if (this->debug_ > 1)
00449 ACE_DEBUG ((LM_DEBUG,
00450 "ImR: Failed to find server in repository.\n"));
00451 }
00452 }
00453
00454 void
00455 ImR_Locator_i::activate_server (const char* server)
00456 {
00457 if (debug_ > 1)
00458 ACE_DEBUG ((LM_DEBUG, "ImR: Manually activating server <%s>\n", server));
00459
00460
00461
00462 activate_server_by_name (server, true);
00463 }
00464
00465 char*
00466 ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start)
00467 {
00468
00469
00470 ACE_ASSERT (name != 0);
00471
00472 Server_Info_Ptr info = this->repository_.get_server (name);
00473 if (info.null ())
00474 {
00475 ACE_ERROR ((LM_ERROR, "ImR: Cannot find info for server <%s>\n", name));
00476 throw ImplementationRepository::NotFound ();
00477 }
00478
00479 return activate_server_i (*info, manual_start);
00480 }
00481
00482 char*
00483 ImR_Locator_i::activate_server_by_object (const char* object_name)
00484 {
00485 ACE_ASSERT (object_name != 0);
00486
00487
00488
00489 ACE_CString server_name (object_name);
00490 ACE_CString::size_type pos = server_name.find ('/');
00491 if (pos != ACE_CString::npos)
00492 server_name = server_name.substr (pos + 1);
00493
00494 return activate_server_by_name (server_name.c_str (), false);
00495 }
00496
00497 char*
00498 ImR_Locator_i::activate_server_i (Server_Info& info, bool manual_start)
00499 {
00500 if (info.activation_mode == ImplementationRepository::PER_CLIENT)
00501 {
00502 return activate_perclient_server_i (info, manual_start);
00503 }
00504
00505 while (true)
00506 {
00507 if (is_alive (info))
00508 {
00509 if (debug_ > 1)
00510 {
00511 ACE_DEBUG ((LM_DEBUG, "ImR: Successfully activated <%s> at \n\t%s\n",
00512 info.name.c_str (), info.partial_ior.c_str ()));
00513 }
00514 info.start_count = 0;
00515
00516 waiter_svt_.unblock_all (info.name.c_str ());
00517
00518 return CORBA::string_dup (info.partial_ior.c_str ());
00519 }
00520
00521 info.reset ();
00522
00523 if (! info.starting && info.start_count >= info.start_limit)
00524 {
00525 if (this->debug_ > 0)
00526 {
00527 ACE_DEBUG ((LM_DEBUG,
00528 "ImR: Cannot Activate <%s>.\n", info.name.c_str ()));
00529 }
00530
00531 waiter_svt_.unblock_all (info.name.c_str ());
00532
00533 throw ImplementationRepository::CannotActivate(
00534 CORBA::string_dup (
00535 "Cannot start server."));
00536 }
00537
00538
00539 ImplementationRepository::StartupInfo_var si =
00540 start_server (info, manual_start, info.waiting_clients);
00541 }
00542 }
00543
00544 char*
00545 ImR_Locator_i::activate_perclient_server_i (Server_Info info, bool manual_start)
00546 {
00547 Server_Info_Ptr shared_info = this->repository_.get_server (info.name);
00548 do
00549 {
00550 ImplementationRepository::StartupInfo* psi =
00551 start_server (info, manual_start, shared_info->waiting_clients);
00552
00553 if (psi != 0)
00554 {
00555 ImplementationRepository::StartupInfo_var si = psi;
00556 ACE_ASSERT (info.name == si->name.in ());
00557 info.partial_ior = si->partial_ior.in ();
00558 info.ior = si->ior.in ();
00559
00560 if (is_alive (info))
00561 {
00562 if (debug_ > 1)
00563 {
00564 ACE_DEBUG ((LM_DEBUG, "ImR: Successfully activated <%s> at \n\t%s\n",
00565 info.name.c_str (), info.partial_ior.c_str ()));
00566 }
00567 return CORBA::string_dup (info.partial_ior.c_str ());
00568 }
00569 info.reset ();
00570 }
00571 } while (info.start_count < info.start_limit);
00572
00573 if (this->debug_ > 0)
00574 {
00575 ACE_DEBUG ((LM_DEBUG,
00576 "ImR: Cannot Activate <%s>.\n", info.name.c_str ()));
00577 }
00578 throw ImplementationRepository::CannotActivate(
00579 CORBA::string_dup (
00580 "Cannot start server."));
00581 }
00582
00583 ImplementationRepository::StartupInfo*
00584 ImR_Locator_i::start_server (Server_Info& info, bool manual_start,
00585 int& waiting_clients)
00586 {
00587 if (info.activation_mode == ImplementationRepository::MANUAL && ! manual_start)
00588 {
00589 if (debug_ > 0)
00590 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>. ActivationMode=MANUAL\n", info.name.c_str ()));
00591 throw ImplementationRepository::CannotActivate(
00592 CORBA::string_dup (
00593 "Cannot implicitly activate MANUAL server."));
00594 }
00595 if (info.cmdline.length () == 0)
00596 {
00597 if (debug_ > 0)
00598 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>."
00599 " No command line.\n", info.name.c_str ()));
00600 throw ImplementationRepository::CannotActivate(
00601 CORBA::string_dup (
00602 "No command line registered for server."));
00603 }
00604
00605 Activator_Info_Ptr ainfo = get_activator (info.activator);
00606
00607 if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ()))
00608 {
00609 if (debug_ > 0)
00610 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>. "
00611 "Activator <%s> not found.\n", info.name.c_str (), info.activator.c_str ()));
00612 throw ImplementationRepository::CannotActivate(
00613 CORBA::string_dup (
00614 "No activator registered for server."));
00615 }
00616
00617 try
00618 {
00619 ++waiting_clients;
00620
00621 if (waiting_clients <= 1 ||
00622 info.activation_mode == ImplementationRepository::PER_CLIENT)
00623 {
00624 info.starting = true;
00625 ++info.start_count;
00626 ACE_ASSERT (info.start_count <= info.start_limit);
00627 if (this->debug_ > 0)
00628 {
00629 ACE_DEBUG ((LM_DEBUG, "ImR: Starting server <%s>. Attempt %d/%d.\n",
00630 info.name.c_str (), info.start_count, info.start_limit));
00631 }
00632 ainfo->activator->start_server (
00633 info.name.c_str (),
00634 info.cmdline.c_str (),
00635 info.dir.c_str (),
00636 info.env_vars);
00637 }
00638
00639 if (info.partial_ior.length () == 0)
00640 {
00641 if (this->debug_ > 0)
00642 {
00643 ACE_DEBUG ((LM_DEBUG, "ImR: Waiting for <%s> to start...\n", info.name.c_str ()));
00644 }
00645
00646 ImplementationRepository::StartupInfo_var si =
00647 waiter_->wait_for_startup (info.name.c_str ());
00648
00649 --waiting_clients;
00650 info.starting = false;
00651
00652 return si._retn ();
00653 }
00654 else
00655 {
00656 if (this->debug_ > 0)
00657 {
00658 ACE_DEBUG ((LM_DEBUG, "ImR: <%s> Skipping wait. Already started.\n", info.name.c_str ()));
00659 }
00660 --waiting_clients;
00661 info.starting = false;
00662 }
00663 }
00664 catch (const CORBA::TIMEOUT&)
00665 {
00666 --waiting_clients;
00667 info.starting = false;
00668
00669
00670
00671 if (info.partial_ior.length () == 0)
00672 {
00673 if (debug_ > 0)
00674 ACE_DEBUG ((LM_DEBUG, "ImR : Timeout waiting for <%s> to start.\n", info.name.c_str ()));
00675 info.reset ();
00676 }
00677 }
00678 catch (const ImplementationRepository::CannotActivate&)
00679 {
00680 --waiting_clients;
00681 info.starting = false;
00682 info.reset ();
00683 if (debug_ > 0)
00684 ACE_DEBUG ((LM_DEBUG, "ImR: Activator cannot start <%s>.\n", info.name.c_str ()));
00685 }
00686 catch (const CORBA::Exception& ex)
00687 {
00688 --waiting_clients;
00689 info.starting = false;
00690 if (debug_ > 0)
00691 ACE_DEBUG ((LM_DEBUG, "ImR: Unexpected exception while starting <%s>.\n", info.name.c_str ()));
00692 if (debug_ > 1)
00693 ex._tao_print_exception ("");
00694 info.reset ();
00695
00696
00697
00698 bool dead_activator = false;
00699 try
00700 {
00701 dead_activator = ainfo->activator->_non_existent ();
00702 }
00703 catch (const CORBA::Exception&)
00704 {
00705 dead_activator = true;
00706 }
00707
00708 if (dead_activator)
00709 {
00710
00711
00712
00713
00714 ainfo->reset ();
00715 }
00716 }
00717 return 0;
00718 }
00719
00720 CORBA::Object_ptr
00721 ImR_Locator_i::set_timeout_policy (CORBA::Object_ptr obj, const ACE_Time_Value& to)
00722 {
00723 CORBA::Object_var ret (CORBA::Object::_duplicate (obj));
00724
00725 try
00726 {
00727 TimeBase::TimeT timeout;
00728 ORBSVCS_Time::Time_Value_to_TimeT (timeout, to);
00729 CORBA::Any tmp;
00730 tmp <<= timeout;
00731
00732 CORBA::PolicyList policies (1);
00733 policies.length (1);
00734 policies[0] = orb_->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, tmp);
00735
00736 ret = obj->_set_policy_overrides (policies, CORBA::ADD_OVERRIDE);
00737
00738 policies[0]->destroy ();
00739
00740 if (CORBA::is_nil (ret.in ()))
00741 {
00742 if (this->debug_ > 0)
00743 {
00744 ACE_DEBUG ((LM_DEBUG, "ImR: Unable to set timeout policy.\n"));
00745 }
00746 ret = CORBA::Object::_duplicate (obj);
00747 }
00748 }
00749 catch (const CORBA::Exception& ex)
00750 {
00751 ex._tao_print_exception (
00752 "ImR_Locator_i::set_timeout_policy ()");
00753 }
00754
00755 return ret._retn ();
00756 }
00757
00758 void
00759 ImR_Locator_i::add_or_update_server (const char* server,
00760 const ImplementationRepository::StartupOptions &options)
00761 {
00762 ACE_ASSERT (server != 0);
00763
00764 if (this->read_only_)
00765 {
00766 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot add/update server <%s> due to locked database.\n", server));
00767 throw CORBA::NO_PERMISSION (
00768 CORBA::SystemException::_tao_minor_code (
00769 TAO_IMPLREPO_MINOR_CODE,
00770 0),
00771 CORBA::COMPLETED_NO);
00772 }
00773
00774 if (debug_ > 0)
00775 ACE_DEBUG ((LM_DEBUG, "ImR: Add/Update server <%s>.\n", server));
00776
00777 int limit = options.start_limit;
00778 if (limit < 0)
00779 {
00780 limit = -limit;
00781 }
00782 else if (limit == 0)
00783 {
00784 limit = 1;
00785 }
00786
00787 Server_Info_Ptr info = this->repository_.get_server (server);
00788 if (info.null ())
00789 {
00790 if (this->debug_ > 1)
00791 ACE_DEBUG ((LM_DEBUG, "ImR: Adding server <%s>.\n", server));
00792
00793 this->repository_.add_server (server,
00794 options.activator.in (),
00795 options.command_line.in (),
00796 options.environment,
00797 options.working_directory.in (),
00798 options.activation,
00799 limit);
00800 }
00801 else
00802 {
00803 if (this->debug_ > 1)
00804 ACE_DEBUG ((LM_DEBUG, "ImR: Updating server <%s>.\n", server));
00805
00806 info->activator = options.activator.in ();
00807 info->cmdline = options.command_line.in ();
00808 info->env_vars = options.environment;
00809 info->dir = options.working_directory.in ();
00810 info->activation_mode = options.activation;
00811 info->start_limit = limit;
00812 info->start_count = 0;
00813 int err = this->repository_.update_server (*info);
00814 ACE_ASSERT (err == 0);
00815 ACE_UNUSED_ARG (err);
00816 }
00817
00818 if (this->debug_ > 1)
00819 {
00820
00821 ACE_DEBUG ((LM_DEBUG, "ImR: Server: %s\n"
00822 "\tActivator: %s\n"
00823 "\tCommand Line: %s\n"
00824 "\tWorking Directory: %s\n"
00825 "\tActivation: %s\n"
00826 "\tStart Limit: %d\n"
00827 "\n",
00828 server,
00829 options.activator.in (),
00830 options.command_line.in (),
00831 options.working_directory.in (),
00832 ImR_Utils::activationModeToString (options.activation).c_str (),
00833 limit
00834 ));
00835
00836 for (CORBA::ULong i = 0; i < options.environment.length (); ++i)
00837 ACE_DEBUG ((LM_DEBUG, "Environment variable %s=%s\n",
00838 options.environment[i].name.in (),
00839 options.environment[i].value.in ()));
00840 }
00841 }
00842
00843 void
00844 ImR_Locator_i::remove_server (const char* name)
00845 {
00846 ACE_ASSERT (name != 0);
00847 if (this->read_only_)
00848 {
00849 ACE_ERROR ((LM_ERROR,
00850 "ImR: Can't remove server <%s> due to locked database.\n", name));
00851 throw CORBA::NO_PERMISSION (
00852 CORBA::SystemException::_tao_minor_code (
00853 TAO_IMPLREPO_MINOR_CODE,
00854 0),
00855 CORBA::COMPLETED_NO);
00856 }
00857
00858
00859
00860
00861
00862 Server_Info_Ptr info = this->repository_.get_server (name);
00863 if (! info.null ())
00864 {
00865 if (this->repository_.remove_server (name) == 0)
00866 {
00867 if (this->debug_ > 1)
00868 ACE_DEBUG ((LM_DEBUG, "ImR: Removing Server <%s>...\n", name));
00869
00870 PortableServer::POA_var poa = findPOA (name);
00871 if (! CORBA::is_nil (poa.in ()))
00872 {
00873 bool etherealize = true;
00874 bool wait = false;
00875 poa->destroy (etherealize, wait);
00876 }
00877 if (this->debug_ > 0)
00878 ACE_DEBUG ((LM_DEBUG, "ImR: Removed Server <%s>.\n", name));
00879 }
00880 }
00881 else
00882 {
00883 ACE_ERROR ((LM_ERROR,
00884 "ImR: Can't remove unknown server <%s>.\n", name));
00885 throw ImplementationRepository::NotFound ();
00886 }
00887 }
00888
00889 PortableServer::POA_ptr
00890 ImR_Locator_i::findPOA (const char* name)
00891 {
00892 try
00893 {
00894 bool activate_it = false;
00895 return root_poa_->find_POA (name, activate_it);
00896 }
00897 catch (const CORBA::Exception&)
00898 {
00899 }
00900 return PortableServer::POA::_nil ();
00901 }
00902
00903 void
00904 ImR_Locator_i::shutdown_server (const char* server)
00905 {
00906 ACE_ASSERT (server != 0);
00907
00908 if (this->debug_ > 0)
00909 ACE_DEBUG ((LM_DEBUG, "ImR: Shutting down server <%s>.\n", server));
00910
00911 Server_Info_Ptr info = this->repository_.get_server (server);
00912 if (info.null ())
00913 {
00914 ACE_ERROR ((LM_ERROR,
00915 "ImR: shutdown_server () Cannot find info for server <%s>\n", server));
00916 throw ImplementationRepository::NotFound ();
00917 }
00918
00919 connect_server (*info);
00920
00921 if (CORBA::is_nil (info->server.in ()))
00922 {
00923 ACE_ERROR ((LM_ERROR,
00924 "ImR: shutdown_server () Cannot connect to server <%s>\n", server));
00925 throw ImplementationRepository::NotFound ();
00926 }
00927
00928 try
00929 {
00930 CORBA::Object_var obj = set_timeout_policy (info->server.in (), DEFAULT_SHUTDOWN_TIMEOUT);
00931 ImplementationRepository::ServerObject_var server =
00932 ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ());
00933 server->shutdown ();
00934 }
00935 catch (const CORBA::TIMEOUT&)
00936 {
00937 info->reset ();
00938 int err = this->repository_.update_server (*info);
00939 ACE_ASSERT (err == 0);
00940 ACE_UNUSED_ARG (err);
00941
00942
00943 if (this->debug_ > 1)
00944 {
00945 ACE_DEBUG ((LM_DEBUG, "ImR: Timeout while waiting for <%s> shutdown.\n", server));
00946 }
00947 throw;
00948 }
00949 catch (const CORBA::Exception&)
00950 {
00951 if (this->debug_ > 1)
00952 {
00953 ACE_DEBUG ((LM_DEBUG, "ImR: Exception ignored while shutting down <%s>\n", server));
00954 }
00955 }
00956
00957
00958
00959 info->reset ();
00960
00961 int err = this->repository_.update_server (*info);
00962 ACE_ASSERT (err == 0);
00963 ACE_UNUSED_ARG (err);
00964 }
00965
00966 void
00967 ImR_Locator_i::server_is_running (const char* name,
00968 const char* partial_ior,
00969 ImplementationRepository::ServerObject_ptr server)
00970 {
00971 ACE_ASSERT (name != 0);
00972 ACE_ASSERT (partial_ior != 0);
00973 ACE_ASSERT (! CORBA::is_nil (server));
00974
00975 if (this->debug_ > 0)
00976 ACE_DEBUG ((LM_DEBUG, "ImR: Server %s is running at %s.\n", name, partial_ior));
00977
00978 CORBA::String_var ior = orb_->object_to_string (server);
00979
00980 if (this->debug_ > 1)
00981 ACE_DEBUG ((LM_DEBUG, "ImR: Server %s callback at %s.\n", name, ior.in ()));
00982
00983 Server_Info_Ptr info = this->repository_.get_server (name);
00984 if (info.null ())
00985 {
00986 if (this->debug_ > 0)
00987 ACE_DEBUG ((LM_DEBUG, "ImR: Auto adding NORMAL server <%s>.\n", name));
00988
00989 ImplementationRepository::EnvironmentList env (0);
00990 this->repository_.add_server (name,
00991 "",
00992 "",
00993 ImplementationRepository::EnvironmentList (),
00994 "",
00995 ImplementationRepository::NORMAL,
00996 DEFAULT_START_LIMIT,
00997 partial_ior,
00998 ior.in (),
00999 ImplementationRepository::ServerObject::_nil ()
01000 );
01001 }
01002 else
01003 {
01004 if (info->activation_mode != ImplementationRepository::PER_CLIENT) {
01005 info->ior = ior.in ();
01006 info->partial_ior = partial_ior;
01007 info->server = ImplementationRepository::ServerObject::_nil ();
01008
01009 int err = this->repository_.update_server (*info);
01010 ACE_ASSERT (err == 0);
01011 ACE_UNUSED_ARG (err);
01012
01013 waiter_svt_.unblock_one (name, partial_ior, ior.in (), false);
01014 } else {
01015
01016
01017 if (info->waiting_clients > 0)
01018 {
01019 waiter_svt_.unblock_one (name, partial_ior, ior.in (), true);
01020 }
01021 else if (this->debug_ > 1)
01022 {
01023 ACE_DEBUG ((LM_DEBUG,
01024 ACE_TEXT ("ImR - Ignoring server_is_running due to no ")
01025 ACE_TEXT ("waiting PER_CLIENT clients.\n")));
01026 }
01027 }
01028 }
01029 }
01030
01031 void
01032 ImR_Locator_i::server_is_shutting_down (const char* server)
01033 {
01034 ACE_ASSERT (server != 0);
01035 Server_Info_Ptr info = this->repository_.get_server (server);
01036 if (info.null ())
01037 {
01038 if (this->debug_ > 1)
01039 {
01040 ACE_DEBUG ((LM_DEBUG,
01041 "ImR_Locator_i::server_is_shutting_down: Unknown server:%s\n", server));
01042 }
01043 return;
01044 }
01045
01046 if (this->debug_ > 0)
01047 ACE_DEBUG ((LM_DEBUG, "ImR: Server <%s> is shutting down.\n", server));
01048
01049 info->reset ();
01050
01051 int err = this->repository_.update_server (*info);
01052 ACE_ASSERT (err == 0);
01053 ACE_UNUSED_ARG (err);
01054 }
01055
01056 void
01057 ImR_Locator_i::find (const char* server,
01058 ImplementationRepository::ServerInformation_out imr_info)
01059 {
01060 ACE_ASSERT (server != 0);
01061 ACE_NEW_THROW_EX (imr_info, ImplementationRepository::ServerInformation, CORBA::NO_MEMORY ());
01062
01063 Server_Info_Ptr info = this->repository_.get_server (server);
01064 if (! info.null ())
01065 {
01066 imr_info = info->createImRServerInfo ();
01067
01068 if (this->debug_ > 1)
01069 ACE_DEBUG ((LM_DEBUG, "ImR: Found server %s.\n", server));
01070 }
01071 else
01072 {
01073 if (debug_ > 1)
01074 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot find server <%s>\n", server));
01075 }
01076 }
01077
01078 void
01079 ImR_Locator_i::list (CORBA::ULong how_many,
01080 ImplementationRepository::ServerInformationList_out server_list,
01081 ImplementationRepository::ServerInformationIterator_out server_iterator)
01082 {
01083 if (this->debug_ > 1)
01084 ACE_DEBUG ((LM_DEBUG, "ImR: List servers.\n"));
01085
01086
01087
01088 server_iterator = ImplementationRepository::ServerInformationIterator::_nil ();
01089 ACE_NEW_THROW_EX (server_list,
01090 ImplementationRepository::ServerInformationList (0), CORBA::NO_MEMORY ());
01091
01092 Locator_Repository::SIMap::ENTRY* entry = 0;
01093 Locator_Repository::SIMap::ITERATOR it (this->repository_.servers ());
01094
01095
01096 CORBA::ULong n = this->repository_.servers ().current_size ();
01097 if (how_many > 0 && n > how_many)
01098 {
01099 n = how_many;
01100 }
01101
01102 server_list->length (n);
01103
01104 if (this->debug_ > 1)
01105 ACE_DEBUG ((LM_DEBUG, "ImR_Locator_i::list: Filling ServerList with %d servers\n", n));
01106
01107 for (CORBA::ULong i = 0; i < n; i++)
01108 {
01109 it.next (entry);
01110 it.advance ();
01111 ACE_ASSERT (entry != 0);
01112
01113 Server_Info_Ptr info = entry->int_id_;
01114
01115 ImplementationRepository::ServerInformation_var imr_info = info->createImRServerInfo ();
01116 server_list[i] = *imr_info;
01117 }
01118
01119 if (this->repository_.servers ().current_size () > n)
01120 {
01121 if (this->debug_ > 1)
01122 ACE_DEBUG ((LM_DEBUG, "ImR_Locator_i::list: Creating ServerInformation Iterator\n"));
01123
01124 ImR_Iterator* imr_iter = 0;
01125
01126 ACE_NEW_THROW_EX (imr_iter,
01127 ImR_Iterator (n, this->repository_, this->imr_poa_.in ()),
01128 CORBA::NO_MEMORY ());
01129
01130 PortableServer::ServantBase_var tmp (imr_iter);
01131
01132 try
01133 {
01134 PortableServer::ObjectId_var id =
01135 this->imr_poa_->activate_object (imr_iter);
01136 CORBA::Object_var obj = this->imr_poa_->id_to_reference (id.in ());
01137 server_iterator = ImplementationRepository::
01138 ServerInformationIterator::_unchecked_narrow (obj.in ());
01139 }
01140 catch (const CORBA::Exception&)
01141 {
01142 throw;
01143 }
01144 }
01145 }
01146
01147 Activator_Info_Ptr
01148 ImR_Locator_i::get_activator (const ACE_CString& aname)
01149 {
01150 Activator_Info_Ptr info = this->repository_.get_activator (aname);
01151 if (! info.null ())
01152 {
01153 this->connect_activator (*info);
01154 }
01155 return info;
01156 }
01157
01158 void
01159 ImR_Locator_i::connect_activator (Activator_Info& info)
01160 {
01161 if (! CORBA::is_nil (info.activator.in ()) || info.ior.length () == 0)
01162 return;
01163
01164 try
01165 {
01166 CORBA::Object_var obj =
01167 this->orb_->string_to_object (info.ior.c_str ());
01168
01169 if (CORBA::is_nil (obj.in ()))
01170 {
01171 info.reset ();
01172 return;
01173 }
01174
01175 if (startup_timeout_ > ACE_Time_Value::zero)
01176 {
01177 obj = set_timeout_policy (obj.in (), startup_timeout_);
01178 }
01179
01180 info.activator =
01181 ImplementationRepository::Activator::_unchecked_narrow (obj.in ());
01182
01183 if (CORBA::is_nil (info.activator.in ()))
01184 {
01185 info.reset ();
01186 return;
01187 }
01188
01189 if (debug_ > 1)
01190 ACE_DEBUG ((LM_DEBUG, "ImR: Connected to activator <%s>\n", info.name.c_str ()));
01191 }
01192 catch (const CORBA::Exception&)
01193 {
01194 info.reset ();
01195 }
01196 }
01197
01198 void
01199 ImR_Locator_i::auto_start_servers (void)
01200 {
01201 if (this->repository_.servers ().current_size () == 0)
01202 return;
01203
01204 Locator_Repository::SIMap::ENTRY* server_entry;
01205 Locator_Repository::SIMap::ITERATOR server_iter (this->repository_.servers ());
01206
01207
01208
01209
01210 for (;server_iter.next (server_entry) != 0; server_iter.advance ())
01211 {
01212 Server_Info_Ptr info = server_entry->int_id_;
01213 ACE_ASSERT (! info.null ());
01214
01215 try
01216 {
01217 if (info->activation_mode == ImplementationRepository::AUTO_START
01218 && info->cmdline.length () > 0)
01219 {
01220 this->activate_server_i (*info, true);
01221 }
01222 }
01223 catch (const CORBA::Exception& ex)
01224 {
01225 if (this->debug_ > 1)
01226 {
01227 ACE_DEBUG ((LM_DEBUG,
01228 "ImR: AUTO_START Could not activate <%s>\n",
01229 server_entry->ext_id_.c_str ()));
01230 ex._tao_print_exception ("AUTO_START");
01231 }
01232
01233 }
01234 }
01235 }
01236
01237 void
01238 ImR_Locator_i::connect_server (Server_Info& info)
01239 {
01240 if (! CORBA::is_nil (info.server.in ()))
01241 {
01242 return;
01243 }
01244
01245 if (info.ior.length () == 0)
01246 {
01247 info.reset ();
01248 return;
01249 }
01250
01251 try
01252 {
01253 CORBA::Object_var obj = orb_->string_to_object (info.ior.c_str ());
01254
01255 if (CORBA::is_nil (obj.in ()))
01256 {
01257 info.reset ();
01258 return;
01259 }
01260
01261 obj = set_timeout_policy (obj.in (), DEFAULT_SERVER_TIMEOUT);
01262
01263 info.server =
01264 ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ());
01265
01266 if (CORBA::is_nil (info.server.in ()))
01267 {
01268 info.reset ();
01269 return;
01270 }
01271
01272 if (debug_ > 1)
01273 ACE_DEBUG ((LM_DEBUG, "ImR: Connected to server <%s>\n", info.name.c_str ()));
01274 }
01275 catch (const CORBA::Exception&)
01276 {
01277 info.reset ();
01278 }
01279 }
01280
01281 bool
01282 ImR_Locator_i::is_alive (Server_Info& info)
01283 {
01284 const size_t table_size = sizeof (PING_RETRY_SCHEDULE) /
01285 sizeof (*PING_RETRY_SCHEDULE);
01286
01287 for (size_t i = 0; i < table_size; ++i)
01288 {
01289 int status = this->is_alive_i (info);
01290 if (status == 0)
01291 return false;
01292 if (status == 1)
01293 return true;
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309 if (PING_RETRY_SCHEDULE[i] > 0)
01310 {
01311 ACE_Time_Value tv (0, PING_RETRY_SCHEDULE[i] * 1000);
01312 this->orb_->run (tv);
01313 }
01314 }
01315 if (debug_ > 0)
01316 {
01317 ACE_DEBUG ((LM_DEBUG,
01318 "ImR: <%s> Ping retry count exceeded. alive=maybe.\n", info.name.c_str ()));
01319 }
01320
01321
01322
01323 info.last_ping = ACE_OS::gettimeofday ();
01324 return true;
01325 }
01326
01327 int
01328 ImR_Locator_i::is_alive_i (Server_Info& info)
01329 {
01330
01331
01332 if (info.ior.length () == 0 || info.partial_ior.length () == 0)
01333 {
01334 if (debug_ > 1)
01335 {
01336 ACE_DEBUG ((LM_DEBUG,
01337 "ImR: <%s> not running. alive=false.\n", info.name.c_str ()));
01338 }
01339 info.last_ping = ACE_Time_Value::zero;
01340 return 0;
01341 }
01342
01343 if (ping_interval_ == ACE_Time_Value::zero)
01344 {
01345 if (debug_ > 1)
01346 {
01347 ACE_DEBUG ((LM_DEBUG,
01348 "ImR: <%s> Ping verification disabled. alive=true.\n", info.name.c_str ()));
01349 }
01350 return 1;
01351 }
01352
01353 if ((ACE_OS::gettimeofday () - info.last_ping) < ping_interval_)
01354 {
01355 if (debug_ > 1)
01356 {
01357 ACE_DEBUG ((LM_DEBUG,
01358 "ImR: <%s> within ping interval. alive=true.\n", info.name.c_str ()));
01359 }
01360 return 1;
01361 }
01362
01363
01364
01365
01366 if (info.cmdline.length () == 0 || ! repository_.has_activator (info.activator))
01367 {
01368 if (debug_ > 1)
01369 {
01370 ACE_DEBUG ((LM_DEBUG,
01371 "ImR: Ping verification skipped. <%s> not startable.\n", info.name.c_str ()));
01372 }
01373 return 1;
01374 }
01375
01376 connect_server (info);
01377
01378 if (CORBA::is_nil (info.server.in ()))
01379 {
01380 if (debug_ > 1)
01381 {
01382 ACE_DEBUG ((LM_DEBUG,
01383 "ImR: <%s> Could not connect. alive=false.\n", info.name.c_str ()));
01384 }
01385 return 0;
01386 }
01387
01388 try
01389 {
01390
01391 ImplementationRepository::ServerObject_var server = info.server;
01392
01393
01394 server->ping ();
01395
01396 if (debug_ > 1)
01397 {
01398 ACE_DEBUG ((LM_DEBUG,
01399 "ImR: <%s> Ping successful. alive=true\n", info.name.c_str ()));
01400 }
01401 info.last_ping = ACE_OS::gettimeofday ();
01402 }
01403 catch (const CORBA::TRANSIENT& ex)
01404 {
01405 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80;
01406 switch (ex.minor () & BITS_5_THRU_12_MASK)
01407 {
01408 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
01409 {
01410 if (debug_ > 1)
01411 {
01412 ACE_DEBUG ((LM_DEBUG,
01413 "ImR: <%s> Local TRANSIENT. alive=false.\n", info.name.c_str ()));
01414 }
01415 }
01416 info.last_ping = ACE_Time_Value::zero;
01417 return 0;
01418 case TAO_POA_DISCARDING:
01419 case TAO_POA_HOLDING:
01420 {
01421 if (debug_ > 1)
01422 {
01423 ACE_DEBUG ((LM_DEBUG,
01424 "ImR: <%s> Remote TRANSIENT. alive=maybe.\n", info.name.c_str ()));
01425 }
01426 }
01427 return -1;
01428
01429
01430 default:
01431 {
01432 if (debug_ > 1)
01433 {
01434 ACE_DEBUG ((LM_DEBUG,
01435 "ImR: <%s> TRANSIENT exception. alive=false.\n", info.name.c_str ()));
01436 }
01437 info.last_ping = ACE_Time_Value::zero;
01438 }
01439 return 0;
01440 }
01441 }
01442 catch (const CORBA::TIMEOUT&)
01443 {
01444 if (debug_ > 1)
01445 {
01446 ACE_DEBUG ((LM_DEBUG,
01447 "ImR: <%s> Ping timed out. alive=true.\n", info.name.c_str ()));
01448 }
01449 return 1;
01450
01451
01452
01453 }
01454 catch (const CORBA::Exception& ex)
01455 {
01456 if (debug_ > 1)
01457 {
01458 ACE_DEBUG ((LM_DEBUG, "ImR: <%s> Unexpected Ping exception. alive=false\n", info.name.c_str ()));
01459 ex._tao_print_exception ("\n");
01460 }
01461 info.last_ping = ACE_Time_Value::zero;
01462 return false;
01463 }
01464 return 1;
01465 }
01466
01467 int
01468 ImR_Locator_i::debug () const
01469 {
01470 return debug_;
01471 }