00001
00002
00003 #include "ImR_Locator_i.h"
00004 #include "utils.h"
00005 #include "Iterator.h"
00006 #include "INS_Locator.h"
00007
00008 #include "orbsvcs/Time_Utilities.h"
00009
00010 #include "tao/IORTable/IORTable.h"
00011 #include "tao/PortableServer/PortableServer.h"
00012 #include "tao/ORB_Core.h"
00013 #include "tao/default_ports.h"
00014 #include "tao/Messaging/Messaging.h"
00015 #include "tao/AnyTypeCode/Any.h"
00016
00017 #include "ace/ARGV.h"
00018 #include "ace/OS_NS_sys_time.h"
00019 #include "ace/Vector_T.h"
00020
00021 static const int DEFAULT_START_LIMIT = 1;
00022
00023 static const int PING_RETRY_SCHEDULE[] = {0, 10, 100, 500, 1000, 1000, 1000, 1000, 5000, 5000};
00024
00025 static const ACE_Time_Value DEFAULT_SERVER_TIMEOUT (0, 10 * 1000);
00026
00027
00028
00029
00030 static const ACE_Time_Value DEFAULT_SHUTDOWN_TIMEOUT (0, 5000 * 1000);
00031
00032 static PortableServer::POA_ptr
00033 createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) {
00034
00035 PortableServer::LifespanPolicy_var life =
00036 root_poa->create_lifespan_policy (PortableServer::PERSISTENT);
00037
00038 PortableServer::IdAssignmentPolicy_var assign =
00039 root_poa->create_id_assignment_policy (PortableServer::USER_ID);
00040
00041 CORBA::PolicyList pols;
00042 pols.length (2);
00043 pols[0] = PortableServer::LifespanPolicy::_duplicate (life.in ());
00044 pols[1] = PortableServer::IdAssignmentPolicy::_duplicate (assign.in ());
00045
00046 PortableServer::POAManager_var mgr = root_poa->the_POAManager ();
00047 PortableServer::POA_var poa =
00048 root_poa->create_POA (poa_name, mgr.in (), pols);
00049
00050 life->destroy ();
00051 assign->destroy ();
00052
00053 return poa._retn ();
00054 }
00055
00056 ImR_Locator_i::ImR_Locator_i (void)
00057 : forwarder_ (*this)
00058 , ins_locator_ (0)
00059 , debug_ (0)
00060 , read_only_ (false)
00061 {
00062
00063
00064
00065 INS_Locator* locator;
00066 ACE_NEW (locator,
00067 INS_Locator (*this));
00068 ins_locator_ = locator;
00069 }
00070
00071 ImR_Locator_i::~ImR_Locator_i (void)
00072 {
00073
00074
00075
00076
00077
00078 }
00079
00080 int
00081 ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts)
00082 {
00083 orb_ = CORBA::ORB::_duplicate (orb);
00084 debug_ = opts.debug ();
00085 read_only_ = opts.readonly ();
00086 startup_timeout_ = opts.startup_timeout ();
00087 ping_interval_ = opts.ping_interval ();
00088
00089 CORBA::Object_var obj =
00090 this->orb_->resolve_initial_references ("RootPOA");
00091 this->root_poa_ = PortableServer::POA::_narrow (obj.in ());
00092 ACE_ASSERT (! CORBA::is_nil (this->root_poa_.in ()));
00093
00094 this->forwarder_.init (orb);
00095 this->adapter_.init (& this->forwarder_);
00096
00097
00098
00099 root_poa_->the_activator (&this->adapter_);
00100
00101
00102 this->imr_poa_ = createPersistentPOA (this->root_poa_.in (),
00103 "ImplRepo_Service");
00104 ACE_ASSERT (! CORBA::is_nil (this->imr_poa_.in ()));
00105
00106 waiter_svt_.debug (debug_ > 1);
00107 PortableServer::ObjectId_var id = PortableServer::string_to_ObjectId ("ImR_AsyncStartupWaiter");
00108 this->imr_poa_->activate_object_with_id (id.in (), &waiter_svt_);
00109 obj = this->imr_poa_->id_to_reference (id.in ());
00110 if (startup_timeout_ > ACE_Time_Value::zero)
00111 {
00112 obj = set_timeout_policy (obj.in (), startup_timeout_);
00113 }
00114 waiter_ = ImplementationRepository::AsyncStartupWaiter::_narrow (obj.in ());
00115
00116 id = PortableServer::string_to_ObjectId ("ImplRepo_Service");
00117 this->imr_poa_->activate_object_with_id (id.in (), this);
00118
00119 obj = this->imr_poa_->id_to_reference (id.in ());
00120 CORBA::String_var ior = this->orb_->object_to_string (obj.in ());
00121
00122
00123 obj = orb->resolve_initial_references ("IORTable");
00124 IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ());
00125 ACE_ASSERT (! CORBA::is_nil (ior_table.in ()));
00126 ior_table->bind ("ImplRepoService", ior.in ());
00127 ior_table->bind ("ImR", ior.in ());
00128 ior_table->set_locator (this->ins_locator_.in ());
00129
00130
00131 if (opts.multicast ())
00132 {
00133 ACE_Reactor* reactor = orb->orb_core ()->reactor ();
00134 if (this->setup_multicast (reactor, ior.in ()) != 0)
00135 return -1;
00136 }
00137
00138
00139
00140
00141
00142 int init_result =
00143 this->repository_.init (opts);
00144 if (init_result == -1)
00145 {
00146 ACE_ERROR_RETURN ((LM_ERROR, "Repository failed to initialize\n"), -1);
00147 }
00148
00149
00150 PortableServer::POAManager_var poaman =
00151 this->root_poa_->the_POAManager ();
00152 poaman->activate ();
00153 poaman = this->imr_poa_->the_POAManager ();
00154 poaman->activate ();
00155
00156
00157 if (opts.ior_filename ().length () > 0)
00158 {
00159 FILE* fp = ACE_OS::fopen (opts.ior_filename ().c_str (), "w");
00160 if (fp == 0)
00161 {
00162 ACE_ERROR_RETURN ((LM_ERROR,
00163 "ImR: Could not open file: %s\n", opts.ior_filename ().c_str ()), -1);
00164 }
00165 ACE_OS::fprintf (fp, "%s", ior.in ());
00166 ACE_OS::fclose (fp);
00167 }
00168
00169 return 0;
00170 }
00171
00172 int
00173 ImR_Locator_i::init (Options& opts)
00174 {
00175 ACE_CString cmdline = opts.cmdline ();
00176 cmdline += " -orbcollocation no -orbuseimr 0";
00177 ACE_ARGV av (cmdline.c_str ());
00178 int argc = av.argc ();
00179 char** argv = av.argv ();
00180
00181 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "TAO_ImR_Locator");
00182 int err = this->init_with_orb (orb.in (), opts);
00183 return err;
00184 }
00185
00186 int
00187 ImR_Locator_i::run (void)
00188 {
00189 if (debug_ > 0)
00190 {
00191
00192
00193
00194
00195
00196 ACE_DEBUG ((LM_DEBUG,
00197 "Implementation Repository: Running\n"
00198 "\tPing Interval : %dms\n"
00199 "\tStartup Timeout : %ds\n"
00200 "\tPersistence : %s\n"
00201 "\tMulticast : %s\n",
00202 ping_interval_.msec (),
00203 startup_timeout_.sec (),
00204 repository_.repo_mode (),
00205 ior_multicast_.reactor () != 0 ? "Enabled" : "Disabled"));
00206 ACE_DEBUG ((LM_DEBUG,
00207 "\tDebug : %d\n"
00208 "\tLocked : %s\n\n",
00209 debug (),
00210 read_only_ ? "True" : "False"));
00211 }
00212 this->auto_start_servers ();
00213
00214 this->orb_->run ();
00215 return 0;
00216 }
00217
00218 void
00219 ImR_Locator_i::shutdown (CORBA::Boolean activators, CORBA::Boolean servers)
00220 {
00221 if (servers != 0 && this->repository_.servers ().current_size () > 0)
00222 {
00223
00224 ACE_ERROR ((LM_ERROR, "ImR: Shutdown of all servers not implemented.\n"));
00225 }
00226 if (activators != 0 && this->repository_.activators ().current_size () > 0)
00227 {
00228 ACE_Vector<ImplementationRepository::Activator_var> acts;
00229 Locator_Repository::AIMap::ENTRY* entry = 0;
00230 Locator_Repository::AIMap::ITERATOR it (this->repository_.activators ());
00231 for (;it.next (entry) != 0; it.advance ())
00232 {
00233 Activator_Info_Ptr info = entry->int_id_;
00234 ACE_ASSERT (! info.null ());
00235 connect_activator (*info);
00236 if (! CORBA::is_nil (info->activator.in ()))
00237 acts.push_back (info->activator);
00238 }
00239
00240 int shutdown_errs = 0;
00241
00242 for (size_t i = 0; i < acts.size (); ++i)
00243 {
00244 try
00245 {
00246 acts[i]->shutdown ();
00247 acts[i] = ImplementationRepository::Activator::_nil ();
00248 }
00249 catch (const CORBA::Exception& ex)
00250 {
00251 ++shutdown_errs;
00252 if (debug_ > 1)
00253 {
00254 ex._tao_print_exception (
00255 "ImR: shutdown activator");
00256 }
00257 }
00258 }
00259 if (debug_ > 0 && shutdown_errs > 0)
00260 {
00261 ACE_DEBUG ((LM_DEBUG, "ImR: Some activators could not be shut down.\n"));
00262 }
00263 }
00264
00265
00266 this->shutdown (false);
00267 }
00268
00269 void
00270 ImR_Locator_i::shutdown (bool wait_for_completion)
00271 {
00272 this->orb_->shutdown (wait_for_completion);
00273 }
00274
00275 int
00276 ImR_Locator_i::fini (void)
00277 {
00278 try
00279 {
00280 if (debug_ > 1)
00281 ACE_DEBUG ((LM_DEBUG, "ImR: Shutting down...\n"));
00282
00283 teardown_multicast ();
00284
00285 this->root_poa_->destroy (1, 1);
00286
00287 this->orb_->destroy ();
00288
00289 if (debug_ > 0)
00290 ACE_DEBUG ((LM_DEBUG, "ImR: Shut down successfully.\n"));
00291 }
00292 catch (const CORBA::Exception& ex)
00293 {
00294 ex._tao_print_exception ("ImR_Locator_i::fini");
00295 throw;
00296 }
00297 return 0;
00298 }
00299
00300 void
00301 ImR_Locator_i::teardown_multicast ()
00302 {
00303 ACE_Reactor* r = ior_multicast_.reactor ();
00304 if (r != 0) {
00305 r->remove_handler (&ior_multicast_, ACE_Event_Handler::READ_MASK);
00306 ior_multicast_.reactor (0);
00307 }
00308 }
00309
00310 int
00311 ImR_Locator_i::setup_multicast (ACE_Reactor* reactor, const char* ior)
00312 {
00313 ACE_ASSERT (reactor != 0);
00314 ACE_ASSERT (ior != 0);
00315 #if defined (ACE_HAS_IP_MULTICAST)
00316
00317 TAO_ORB_Core* core = TAO_ORB_Core_instance ();
00318
00319 ACE_CString mde (core->orb_params ()->mcast_discovery_endpoint ());
00320
00321 if (mde.length () != 0)
00322 {
00323 if (this->ior_multicast_.init (ior,
00324 mde.c_str (), TAO_SERVICEID_IMPLREPOSERVICE) == -1)
00325 {
00326 return -1;
00327 }
00328 }
00329 else
00330 {
00331
00332 CORBA::UShort port =
00333 core->orb_params ()->service_port (TAO::MCAST_IMPLREPOSERVICE);
00334 if (port == 0)
00335 {
00336
00337 const char* port_number = ACE_OS::getenv ("ImplRepoServicePort");
00338
00339 if (port_number != 0)
00340 port = static_cast<CORBA::UShort> (ACE_OS::atoi (port_number));
00341 }
00342 if (port == 0)
00343 port = TAO_DEFAULT_IMPLREPO_SERVER_REQUEST_PORT;
00344
00345 if (this->ior_multicast_.init (ior, port,
00346 ACE_DEFAULT_MULTICAST_ADDR, TAO_SERVICEID_IMPLREPOSERVICE) == -1)
00347 {
00348 return -1;
00349 }
00350 }
00351
00352
00353 if (reactor->register_handler (&this->ior_multicast_,
00354 ACE_Event_Handler::READ_MASK) == -1)
00355 {
00356 if (debug_ >= 1)
00357 ACE_DEBUG ((LM_DEBUG, "ImR: cannot register Event handler\n"));
00358 return -1;
00359 }
00360 #else
00361 ACE_UNUSED_ARG (reactor);
00362 ACE_UNUSED_ARG (ior);
00363 #endif
00364 return 0;
00365 }
00366
00367 CORBA::Long
00368 ImR_Locator_i::register_activator (const char* aname,
00369 ImplementationRepository::Activator_ptr activator)
00370 {
00371 ACE_ASSERT (aname != 0);
00372 ACE_ASSERT (! CORBA::is_nil (activator));
00373
00374
00375
00376 this->unregister_activator_i (aname);
00377
00378 CORBA::String_var ior =
00379 this->orb_->object_to_string (activator);
00380
00381 CORBA::Long token = ACE_OS::gettimeofday ().msec ();
00382
00383 int err = this->repository_.add_activator (aname, token, ior.in (), activator);
00384 ACE_ASSERT (err == 0);
00385 ACE_UNUSED_ARG (err);
00386
00387 if (this->debug_ > 0)
00388 ACE_DEBUG ((LM_DEBUG, "ImR: Activator registered for %s.\n", aname));
00389
00390 return token;
00391 }
00392
00393 void
00394 ImR_Locator_i::unregister_activator (const char* aname,
00395 CORBA::Long token)
00396 {
00397 ACE_ASSERT (aname != 0);
00398 Activator_Info_Ptr info = this->get_activator (aname);
00399
00400 if (! info.null ())
00401 {
00402 if (info->token != token && this->debug_ > 0)
00403 {
00404 ACE_DEBUG ((LM_DEBUG, "ImR: Ignoring unregister activator:%s. Wrong token.\n", aname));
00405 return;
00406 }
00407
00408 this->unregister_activator_i (aname);
00409
00410 if (this->debug_ > 0)
00411 ACE_DEBUG ((LM_DEBUG, "ImR: Activator %s unregistered.\n", aname));
00412 }
00413 else
00414 {
00415 if (this->debug_ > 0)
00416 ACE_DEBUG ((LM_DEBUG, "ImR: Ignoring unregister activator:%s. Unknown activator.\n", aname));
00417 }
00418 }
00419
00420 void
00421 ImR_Locator_i::unregister_activator_i (const char* aname)
00422 {
00423 ACE_ASSERT (aname != 0);
00424 int err = this->repository_.remove_activator (aname);
00425 ACE_UNUSED_ARG (err);
00426 }
00427
00428 void
00429 ImR_Locator_i::notify_child_death (const char* name)
00430 {
00431 ACE_ASSERT (name != 0);
00432
00433 if (this->debug_ > 1)
00434 ACE_DEBUG ((LM_DEBUG, "ImR: Server has died <%s>.\n", name));
00435
00436 Server_Info_Ptr info = this->repository_.get_server (name);
00437 if (! info.null ())
00438 {
00439 info->ior = "";
00440 info->partial_ior = "";
00441
00442 int err = this->repository_.update_server (*info);
00443 ACE_ASSERT (err == 0);
00444 ACE_UNUSED_ARG (err);
00445 }
00446 else
00447 {
00448 if (this->debug_ > 1)
00449 ACE_DEBUG ((LM_DEBUG,
00450 "ImR: Failed to find server in repository.\n"));
00451 }
00452 }
00453
00454 void
00455 ImR_Locator_i::activate_server (const char* server)
00456 {
00457 if (debug_ > 1)
00458 ACE_DEBUG ((LM_DEBUG, "ImR: Manually activating server <%s>\n", server));
00459
00460
00461
00462 CORBA::String_var cleanup =
00463 activate_server_by_name (server, true);
00464 }
00465
00466 char*
00467 ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start)
00468 {
00469
00470
00471 ACE_ASSERT (name != 0);
00472
00473 Server_Info_Ptr info = this->repository_.get_server (name);
00474 if (info.null ())
00475 {
00476 ACE_ERROR ((LM_ERROR, "ImR: Cannot find info for server <%s>\n", name));
00477 throw ImplementationRepository::NotFound ();
00478 }
00479
00480 return activate_server_i (*info, manual_start);
00481 }
00482
00483 char*
00484 ImR_Locator_i::activate_server_by_object (const char* object_name)
00485 {
00486 ACE_ASSERT (object_name != 0);
00487
00488
00489
00490 ACE_CString server_name (object_name);
00491 ACE_CString::size_type pos = server_name.find ('/');
00492 if (pos != ACE_CString::npos)
00493 server_name = server_name.substr (pos + 1);
00494
00495 return activate_server_by_name (server_name.c_str (), false);
00496 }
00497
00498 char*
00499 ImR_Locator_i::activate_server_i (Server_Info& info, bool manual_start)
00500 {
00501 if (info.activation_mode == ImplementationRepository::PER_CLIENT)
00502 {
00503 return activate_perclient_server_i (info, manual_start);
00504 }
00505
00506 while (true)
00507 {
00508 if (is_alive (info))
00509 {
00510 if (debug_ > 1)
00511 {
00512 ACE_DEBUG ((LM_DEBUG, "ImR: Successfully activated <%s> at \n\t%s\n",
00513 info.name.c_str (), info.partial_ior.c_str ()));
00514 }
00515 info.start_count = 0;
00516
00517 waiter_svt_.unblock_all (info.name.c_str ());
00518
00519 return CORBA::string_dup (info.partial_ior.c_str ());
00520 }
00521
00522 info.reset ();
00523
00524 if (! info.starting && info.start_count >= info.start_limit)
00525 {
00526 if (this->debug_ > 0)
00527 {
00528 ACE_DEBUG ((LM_DEBUG,
00529 "ImR: Cannot Activate <%s>.\n", info.name.c_str ()));
00530 }
00531
00532 waiter_svt_.unblock_all (info.name.c_str ());
00533
00534 throw ImplementationRepository::CannotActivate(
00535 CORBA::string_dup (
00536 "Cannot start server."));
00537 }
00538
00539
00540 ImplementationRepository::StartupInfo_var si =
00541 start_server (info, manual_start, info.waiting_clients);
00542 }
00543 }
00544
00545 char*
00546 ImR_Locator_i::activate_perclient_server_i (Server_Info info, bool manual_start)
00547 {
00548 Server_Info_Ptr shared_info = this->repository_.get_server (info.name);
00549 do
00550 {
00551 ImplementationRepository::StartupInfo* psi =
00552 start_server (info, manual_start, shared_info->waiting_clients);
00553
00554 if (psi != 0)
00555 {
00556 ImplementationRepository::StartupInfo_var si = psi;
00557 ACE_ASSERT (info.name == si->name.in ());
00558 info.partial_ior = si->partial_ior.in ();
00559 info.ior = si->ior.in ();
00560
00561 if (is_alive (info))
00562 {
00563 if (debug_ > 1)
00564 {
00565 ACE_DEBUG ((LM_DEBUG, "ImR: Successfully activated <%s> at \n\t%s\n",
00566 info.name.c_str (), info.partial_ior.c_str ()));
00567 }
00568 return CORBA::string_dup (info.partial_ior.c_str ());
00569 }
00570 info.reset ();
00571 }
00572 } while (info.start_count < info.start_limit);
00573
00574 if (this->debug_ > 0)
00575 {
00576 ACE_DEBUG ((LM_DEBUG,
00577 "ImR: Cannot Activate <%s>.\n", info.name.c_str ()));
00578 }
00579 throw ImplementationRepository::CannotActivate(
00580 CORBA::string_dup (
00581 "Cannot start server."));
00582 }
00583
00584 ImplementationRepository::StartupInfo*
00585 ImR_Locator_i::start_server (Server_Info& info, bool manual_start,
00586 int& waiting_clients)
00587 {
00588 if (info.activation_mode == ImplementationRepository::MANUAL && ! manual_start)
00589 {
00590 if (debug_ > 0)
00591 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>. ActivationMode=MANUAL\n", info.name.c_str ()));
00592 throw ImplementationRepository::CannotActivate(
00593 CORBA::string_dup (
00594 "Cannot implicitly activate MANUAL server."));
00595 }
00596 if (info.cmdline.length () == 0)
00597 {
00598 if (debug_ > 0)
00599 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>."
00600 " No command line.\n", info.name.c_str ()));
00601 throw ImplementationRepository::CannotActivate(
00602 CORBA::string_dup (
00603 "No command line registered for server."));
00604 }
00605
00606 Activator_Info_Ptr ainfo = get_activator (info.activator);
00607
00608 if (ainfo.null () || CORBA::is_nil (ainfo->activator.in ()))
00609 {
00610 if (debug_ > 0)
00611 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot start server <%s>. "
00612 "Activator <%s> not found.\n", info.name.c_str (), info.activator.c_str ()));
00613 throw ImplementationRepository::CannotActivate(
00614 CORBA::string_dup (
00615 "No activator registered for server."));
00616 }
00617
00618 try
00619 {
00620 ++waiting_clients;
00621
00622 if (waiting_clients <= 1 ||
00623 info.activation_mode == ImplementationRepository::PER_CLIENT)
00624 {
00625 info.starting = true;
00626 ++info.start_count;
00627 ACE_ASSERT (info.start_count <= info.start_limit);
00628 if (this->debug_ > 0)
00629 {
00630 ACE_DEBUG ((LM_DEBUG, "ImR: Starting server <%s>. Attempt %d/%d.\n",
00631 info.name.c_str (), info.start_count, info.start_limit));
00632 }
00633 ainfo->activator->start_server (
00634 info.name.c_str (),
00635 info.cmdline.c_str (),
00636 info.dir.c_str (),
00637 info.env_vars);
00638 }
00639
00640 if (info.partial_ior.length () == 0)
00641 {
00642 if (this->debug_ > 0)
00643 {
00644 ACE_DEBUG ((LM_DEBUG, "ImR: Waiting for <%s> to start...\n", info.name.c_str ()));
00645 }
00646
00647 ImplementationRepository::StartupInfo_var si =
00648 waiter_->wait_for_startup (info.name.c_str ());
00649
00650 --waiting_clients;
00651 info.starting = false;
00652
00653 return si._retn ();
00654 }
00655 else
00656 {
00657 if (this->debug_ > 0)
00658 {
00659 ACE_DEBUG ((LM_DEBUG, "ImR: <%s> Skipping wait. Already started.\n", info.name.c_str ()));
00660 }
00661 --waiting_clients;
00662 info.starting = false;
00663 }
00664 }
00665 catch (const CORBA::TIMEOUT&)
00666 {
00667 --waiting_clients;
00668 info.starting = false;
00669
00670
00671
00672 if (info.partial_ior.length () == 0)
00673 {
00674 if (debug_ > 0)
00675 ACE_DEBUG ((LM_DEBUG, "ImR : Timeout waiting for <%s> to start.\n", info.name.c_str ()));
00676 info.reset ();
00677 }
00678 }
00679 catch (const ImplementationRepository::CannotActivate&)
00680 {
00681 --waiting_clients;
00682 info.starting = false;
00683 info.reset ();
00684 if (debug_ > 0)
00685 ACE_DEBUG ((LM_DEBUG, "ImR: Activator cannot start <%s>.\n", info.name.c_str ()));
00686 }
00687 catch (const CORBA::Exception& ex)
00688 {
00689 --waiting_clients;
00690 info.starting = false;
00691 if (debug_ > 0)
00692 ACE_DEBUG ((LM_DEBUG, "ImR: Unexpected exception while starting <%s>.\n", info.name.c_str ()));
00693 if (debug_ > 1)
00694 ex._tao_print_exception ("");
00695 info.reset ();
00696
00697
00698
00699 bool dead_activator = false;
00700 try
00701 {
00702 dead_activator = ainfo->activator->_non_existent ();
00703 }
00704 catch (const CORBA::Exception&)
00705 {
00706 dead_activator = true;
00707 }
00708
00709 if (dead_activator)
00710 {
00711
00712
00713
00714
00715 ainfo->reset ();
00716 }
00717 }
00718 return 0;
00719 }
00720
00721 CORBA::Object_ptr
00722 ImR_Locator_i::set_timeout_policy (CORBA::Object_ptr obj, const ACE_Time_Value& to)
00723 {
00724 CORBA::Object_var ret (CORBA::Object::_duplicate (obj));
00725
00726 try
00727 {
00728 TimeBase::TimeT timeout;
00729 ORBSVCS_Time::Time_Value_to_TimeT (timeout, to);
00730 CORBA::Any tmp;
00731 tmp <<= timeout;
00732
00733 CORBA::PolicyList policies (1);
00734 policies.length (1);
00735 policies[0] = orb_->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, tmp);
00736
00737 ret = obj->_set_policy_overrides (policies, CORBA::ADD_OVERRIDE);
00738
00739 policies[0]->destroy ();
00740
00741 if (CORBA::is_nil (ret.in ()))
00742 {
00743 if (this->debug_ > 0)
00744 {
00745 ACE_DEBUG ((LM_DEBUG, "ImR: Unable to set timeout policy.\n"));
00746 }
00747 ret = CORBA::Object::_duplicate (obj);
00748 }
00749 }
00750 catch (const CORBA::Exception& ex)
00751 {
00752 ex._tao_print_exception (
00753 "ImR_Locator_i::set_timeout_policy ()");
00754 }
00755
00756 return ret._retn ();
00757 }
00758
00759 void
00760 ImR_Locator_i::add_or_update_server (const char* server,
00761 const ImplementationRepository::StartupOptions &options)
00762 {
00763 ACE_ASSERT (server != 0);
00764
00765 if (this->read_only_)
00766 {
00767 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot add/update server <%s> due to locked database.\n", server));
00768 throw CORBA::NO_PERMISSION (
00769 CORBA::SystemException::_tao_minor_code (
00770 TAO_IMPLREPO_MINOR_CODE,
00771 0),
00772 CORBA::COMPLETED_NO);
00773 }
00774
00775 if (debug_ > 0)
00776 ACE_DEBUG ((LM_DEBUG, "ImR: Add/Update server <%s>.\n", server));
00777
00778 int limit = options.start_limit;
00779 if (limit < 0)
00780 {
00781 limit = -limit;
00782 }
00783 else if (limit == 0)
00784 {
00785 limit = 1;
00786 }
00787
00788 Server_Info_Ptr info = this->repository_.get_server (server);
00789 if (info.null ())
00790 {
00791 if (this->debug_ > 1)
00792 ACE_DEBUG ((LM_DEBUG, "ImR: Adding server <%s>.\n", server));
00793
00794 this->repository_.add_server (server,
00795 options.activator.in (),
00796 options.command_line.in (),
00797 options.environment,
00798 options.working_directory.in (),
00799 options.activation,
00800 limit);
00801 }
00802 else
00803 {
00804 if (this->debug_ > 1)
00805 ACE_DEBUG ((LM_DEBUG, "ImR: Updating server <%s>.\n", server));
00806
00807 info->activator = options.activator.in ();
00808 info->cmdline = options.command_line.in ();
00809 info->env_vars = options.environment;
00810 info->dir = options.working_directory.in ();
00811 info->activation_mode = options.activation;
00812 info->start_limit = limit;
00813 info->start_count = 0;
00814 int err = this->repository_.update_server (*info);
00815 ACE_ASSERT (err == 0);
00816 ACE_UNUSED_ARG (err);
00817 }
00818
00819 if (this->debug_ > 1)
00820 {
00821
00822 ACE_DEBUG ((LM_DEBUG, "ImR: Server: %s\n"
00823 "\tActivator: %s\n"
00824 "\tCommand Line: %s\n"
00825 "\tWorking Directory: %s\n"
00826 "\tActivation: %s\n"
00827 "\tStart Limit: %d\n"
00828 "\n",
00829 server,
00830 options.activator.in (),
00831 options.command_line.in (),
00832 options.working_directory.in (),
00833 ImR_Utils::activationModeToString (options.activation).c_str (),
00834 limit
00835 ));
00836
00837 for (CORBA::ULong i = 0; i < options.environment.length (); ++i)
00838 ACE_DEBUG ((LM_DEBUG, "Environment variable %s=%s\n",
00839 options.environment[i].name.in (),
00840 options.environment[i].value.in ()));
00841 }
00842 }
00843
00844 void
00845 ImR_Locator_i::remove_server (const char* name)
00846 {
00847 ACE_ASSERT (name != 0);
00848 if (this->read_only_)
00849 {
00850 ACE_ERROR ((LM_ERROR,
00851 "ImR: Can't remove server <%s> due to locked database.\n", name));
00852 throw CORBA::NO_PERMISSION (
00853 CORBA::SystemException::_tao_minor_code (
00854 TAO_IMPLREPO_MINOR_CODE,
00855 0),
00856 CORBA::COMPLETED_NO);
00857 }
00858
00859
00860
00861
00862
00863 Server_Info_Ptr info = this->repository_.get_server (name);
00864 if (! info.null ())
00865 {
00866 if (this->repository_.remove_server (name) == 0)
00867 {
00868 if (this->debug_ > 1)
00869 ACE_DEBUG ((LM_DEBUG, "ImR: Removing Server <%s>...\n", name));
00870
00871 PortableServer::POA_var poa = findPOA (name);
00872 if (! CORBA::is_nil (poa.in ()))
00873 {
00874 bool etherealize = true;
00875 bool wait = false;
00876 poa->destroy (etherealize, wait);
00877 }
00878 if (this->debug_ > 0)
00879 ACE_DEBUG ((LM_DEBUG, "ImR: Removed Server <%s>.\n", name));
00880 }
00881 }
00882 else
00883 {
00884 ACE_ERROR ((LM_ERROR,
00885 "ImR: Can't remove unknown server <%s>.\n", name));
00886 throw ImplementationRepository::NotFound ();
00887 }
00888 }
00889
00890 PortableServer::POA_ptr
00891 ImR_Locator_i::findPOA (const char* name)
00892 {
00893 try
00894 {
00895 bool activate_it = false;
00896 return root_poa_->find_POA (name, activate_it);
00897 }
00898 catch (const CORBA::Exception&)
00899 {
00900 }
00901 return PortableServer::POA::_nil ();
00902 }
00903
00904 void
00905 ImR_Locator_i::shutdown_server (const char* server)
00906 {
00907 ACE_ASSERT (server != 0);
00908
00909 if (this->debug_ > 0)
00910 ACE_DEBUG ((LM_DEBUG, "ImR: Shutting down server <%s>.\n", server));
00911
00912 Server_Info_Ptr info = this->repository_.get_server (server);
00913 if (info.null ())
00914 {
00915 ACE_ERROR ((LM_ERROR,
00916 "ImR: shutdown_server () Cannot find info for server <%s>\n", server));
00917 throw ImplementationRepository::NotFound ();
00918 }
00919
00920 connect_server (*info);
00921
00922 if (CORBA::is_nil (info->server.in ()))
00923 {
00924 ACE_ERROR ((LM_ERROR,
00925 "ImR: shutdown_server () Cannot connect to server <%s>\n", server));
00926 throw ImplementationRepository::NotFound ();
00927 }
00928
00929 try
00930 {
00931 CORBA::Object_var obj = set_timeout_policy (info->server.in (), DEFAULT_SHUTDOWN_TIMEOUT);
00932 ImplementationRepository::ServerObject_var server =
00933 ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ());
00934 server->shutdown ();
00935 }
00936 catch (const CORBA::TIMEOUT&)
00937 {
00938 info->reset ();
00939 int err = this->repository_.update_server (*info);
00940 ACE_ASSERT (err == 0);
00941 ACE_UNUSED_ARG (err);
00942
00943
00944 if (this->debug_ > 1)
00945 {
00946 ACE_DEBUG ((LM_DEBUG, "ImR: Timeout while waiting for <%s> shutdown.\n", server));
00947 }
00948 throw;
00949 }
00950 catch (const CORBA::Exception&)
00951 {
00952 if (this->debug_ > 1)
00953 {
00954 ACE_DEBUG ((LM_DEBUG, "ImR: Exception ignored while shutting down <%s>\n", server));
00955 }
00956 }
00957
00958
00959
00960 info->reset ();
00961
00962 int err = this->repository_.update_server (*info);
00963 ACE_ASSERT (err == 0);
00964 ACE_UNUSED_ARG (err);
00965 }
00966
00967 void
00968 ImR_Locator_i::server_is_running (const char* name,
00969 const char* partial_ior,
00970 ImplementationRepository::ServerObject_ptr server)
00971 {
00972 ACE_ASSERT (name != 0);
00973 ACE_ASSERT (partial_ior != 0);
00974 ACE_ASSERT (! CORBA::is_nil (server));
00975
00976 if (this->debug_ > 0)
00977 ACE_DEBUG ((LM_DEBUG, "ImR: Server %s is running at %s.\n", name, partial_ior));
00978
00979 CORBA::String_var ior = orb_->object_to_string (server);
00980
00981 if (this->debug_ > 1)
00982 ACE_DEBUG ((LM_DEBUG, "ImR: Server %s callback at %s.\n", name, ior.in ()));
00983
00984 Server_Info_Ptr info = this->repository_.get_server (name);
00985 if (info.null ())
00986 {
00987 if (this->debug_ > 0)
00988 ACE_DEBUG ((LM_DEBUG, "ImR: Auto adding NORMAL server <%s>.\n", name));
00989
00990 ImplementationRepository::EnvironmentList env (0);
00991 this->repository_.add_server (name,
00992 "",
00993 "",
00994 ImplementationRepository::EnvironmentList (),
00995 "",
00996 ImplementationRepository::NORMAL,
00997 DEFAULT_START_LIMIT,
00998 partial_ior,
00999 ior.in (),
01000 ImplementationRepository::ServerObject::_nil ()
01001 );
01002 }
01003 else
01004 {
01005 if (info->activation_mode != ImplementationRepository::PER_CLIENT) {
01006 info->ior = ior.in ();
01007 info->partial_ior = partial_ior;
01008 info->server = ImplementationRepository::ServerObject::_nil ();
01009
01010 int err = this->repository_.update_server (*info);
01011 ACE_ASSERT (err == 0);
01012 ACE_UNUSED_ARG (err);
01013
01014 waiter_svt_.unblock_one (name, partial_ior, ior.in (), false);
01015 } else {
01016
01017
01018 if (info->waiting_clients > 0)
01019 {
01020 waiter_svt_.unblock_one (name, partial_ior, ior.in (), true);
01021 }
01022 else if (this->debug_ > 1)
01023 {
01024 ACE_DEBUG ((LM_DEBUG,
01025 ACE_TEXT ("ImR - Ignoring server_is_running due to no ")
01026 ACE_TEXT ("waiting PER_CLIENT clients.\n")));
01027 }
01028 }
01029 }
01030 }
01031
01032 void
01033 ImR_Locator_i::server_is_shutting_down (const char* server)
01034 {
01035 ACE_ASSERT (server != 0);
01036 Server_Info_Ptr info = this->repository_.get_server (server);
01037 if (info.null ())
01038 {
01039 if (this->debug_ > 1)
01040 {
01041 ACE_DEBUG ((LM_DEBUG,
01042 "ImR_Locator_i::server_is_shutting_down: Unknown server:%s\n", server));
01043 }
01044 return;
01045 }
01046
01047 if (this->debug_ > 0)
01048 ACE_DEBUG ((LM_DEBUG, "ImR: Server <%s> is shutting down.\n", server));
01049
01050 info->reset ();
01051
01052 int err = this->repository_.update_server (*info);
01053 ACE_ASSERT (err == 0);
01054 ACE_UNUSED_ARG (err);
01055 }
01056
01057 void
01058 ImR_Locator_i::find (const char* server,
01059 ImplementationRepository::ServerInformation_out imr_info)
01060 {
01061 ACE_ASSERT (server != 0);
01062 ACE_NEW_THROW_EX (imr_info, ImplementationRepository::ServerInformation, CORBA::NO_MEMORY ());
01063
01064 Server_Info_Ptr info = this->repository_.get_server (server);
01065 if (! info.null ())
01066 {
01067 imr_info = info->createImRServerInfo ();
01068
01069 if (this->debug_ > 1)
01070 ACE_DEBUG ((LM_DEBUG, "ImR: Found server %s.\n", server));
01071 }
01072 else
01073 {
01074 if (debug_ > 1)
01075 ACE_DEBUG ((LM_DEBUG, "ImR: Cannot find server <%s>\n", server));
01076 }
01077 }
01078
01079 void
01080 ImR_Locator_i::list (CORBA::ULong how_many,
01081 ImplementationRepository::ServerInformationList_out server_list,
01082 ImplementationRepository::ServerInformationIterator_out server_iterator)
01083 {
01084 if (this->debug_ > 1)
01085 ACE_DEBUG ((LM_DEBUG, "ImR: List servers.\n"));
01086
01087
01088
01089 server_iterator = ImplementationRepository::ServerInformationIterator::_nil ();
01090 ACE_NEW_THROW_EX (server_list,
01091 ImplementationRepository::ServerInformationList (0), CORBA::NO_MEMORY ());
01092
01093 Locator_Repository::SIMap::ENTRY* entry = 0;
01094 Locator_Repository::SIMap::ITERATOR it (this->repository_.servers ());
01095
01096
01097 CORBA::ULong n = this->repository_.servers ().current_size ();
01098 if (how_many > 0 && n > how_many)
01099 {
01100 n = how_many;
01101 }
01102
01103 server_list->length (n);
01104
01105 if (this->debug_ > 1)
01106 ACE_DEBUG ((LM_DEBUG, "ImR_Locator_i::list: Filling ServerList with %d servers\n", n));
01107
01108 for (CORBA::ULong i = 0; i < n; i++)
01109 {
01110 it.next (entry);
01111 it.advance ();
01112 ACE_ASSERT (entry != 0);
01113
01114 Server_Info_Ptr info = entry->int_id_;
01115
01116 ImplementationRepository::ServerInformation_var imr_info = info->createImRServerInfo ();
01117 server_list[i] = *imr_info;
01118 }
01119
01120 if (this->repository_.servers ().current_size () > n)
01121 {
01122 if (this->debug_ > 1)
01123 ACE_DEBUG ((LM_DEBUG, "ImR_Locator_i::list: Creating ServerInformation Iterator\n"));
01124
01125 ImR_Iterator* imr_iter = 0;
01126
01127 ACE_NEW_THROW_EX (imr_iter,
01128 ImR_Iterator (n, this->repository_, this->imr_poa_.in ()),
01129 CORBA::NO_MEMORY ());
01130
01131 PortableServer::ServantBase_var tmp (imr_iter);
01132
01133 try
01134 {
01135 PortableServer::ObjectId_var id =
01136 this->imr_poa_->activate_object (imr_iter);
01137 CORBA::Object_var obj = this->imr_poa_->id_to_reference (id.in ());
01138 server_iterator = ImplementationRepository::
01139 ServerInformationIterator::_unchecked_narrow (obj.in ());
01140 }
01141 catch (const CORBA::Exception&)
01142 {
01143 throw;
01144 }
01145 }
01146 }
01147
01148 Activator_Info_Ptr
01149 ImR_Locator_i::get_activator (const ACE_CString& aname)
01150 {
01151 Activator_Info_Ptr info = this->repository_.get_activator (aname);
01152 if (! info.null ())
01153 {
01154 this->connect_activator (*info);
01155 }
01156 return info;
01157 }
01158
01159 void
01160 ImR_Locator_i::connect_activator (Activator_Info& info)
01161 {
01162 if (! CORBA::is_nil (info.activator.in ()) || info.ior.length () == 0)
01163 return;
01164
01165 try
01166 {
01167 CORBA::Object_var obj =
01168 this->orb_->string_to_object (info.ior.c_str ());
01169
01170 if (CORBA::is_nil (obj.in ()))
01171 {
01172 info.reset ();
01173 return;
01174 }
01175
01176 if (startup_timeout_ > ACE_Time_Value::zero)
01177 {
01178 obj = set_timeout_policy (obj.in (), startup_timeout_);
01179 }
01180
01181 info.activator =
01182 ImplementationRepository::Activator::_unchecked_narrow (obj.in ());
01183
01184 if (CORBA::is_nil (info.activator.in ()))
01185 {
01186 info.reset ();
01187 return;
01188 }
01189
01190 if (debug_ > 1)
01191 ACE_DEBUG ((LM_DEBUG, "ImR: Connected to activator <%s>\n", info.name.c_str ()));
01192 }
01193 catch (const CORBA::Exception&)
01194 {
01195 info.reset ();
01196 }
01197 }
01198
01199 void
01200 ImR_Locator_i::auto_start_servers (void)
01201 {
01202 if (this->repository_.servers ().current_size () == 0)
01203 return;
01204
01205 Locator_Repository::SIMap::ENTRY* server_entry;
01206 Locator_Repository::SIMap::ITERATOR server_iter (this->repository_.servers ());
01207
01208
01209
01210
01211 for (;server_iter.next (server_entry) != 0; server_iter.advance ())
01212 {
01213 Server_Info_Ptr info = server_entry->int_id_;
01214 ACE_ASSERT (! info.null ());
01215
01216 try
01217 {
01218 if (info->activation_mode == ImplementationRepository::AUTO_START
01219 && info->cmdline.length () > 0)
01220 {
01221 CORBA::String_var cleanup =
01222 this->activate_server_i (*info, true);
01223 }
01224 }
01225 catch (const CORBA::Exception& ex)
01226 {
01227 if (this->debug_ > 1)
01228 {
01229 ACE_DEBUG ((LM_DEBUG,
01230 "ImR: AUTO_START Could not activate <%s>\n",
01231 server_entry->ext_id_.c_str ()));
01232 ex._tao_print_exception ("AUTO_START");
01233 }
01234
01235 }
01236 }
01237 }
01238
01239 void
01240 ImR_Locator_i::connect_server (Server_Info& info)
01241 {
01242 if (! CORBA::is_nil (info.server.in ()))
01243 {
01244 return;
01245 }
01246
01247 if (info.ior.length () == 0)
01248 {
01249 info.reset ();
01250 return;
01251 }
01252
01253 try
01254 {
01255 CORBA::Object_var obj = orb_->string_to_object (info.ior.c_str ());
01256
01257 if (CORBA::is_nil (obj.in ()))
01258 {
01259 info.reset ();
01260 return;
01261 }
01262
01263 obj = set_timeout_policy (obj.in (), DEFAULT_SERVER_TIMEOUT);
01264
01265 info.server =
01266 ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ());
01267
01268 if (CORBA::is_nil (info.server.in ()))
01269 {
01270 info.reset ();
01271 return;
01272 }
01273
01274 if (debug_ > 1)
01275 ACE_DEBUG ((LM_DEBUG, "ImR: Connected to server <%s>\n", info.name.c_str ()));
01276 }
01277 catch (const CORBA::Exception&)
01278 {
01279 info.reset ();
01280 }
01281 }
01282
01283 bool
01284 ImR_Locator_i::is_alive (Server_Info& info)
01285 {
01286 const size_t table_size = sizeof (PING_RETRY_SCHEDULE) /
01287 sizeof (*PING_RETRY_SCHEDULE);
01288
01289 for (size_t i = 0; i < table_size; ++i)
01290 {
01291 int status = this->is_alive_i (info);
01292 if (status == 0)
01293 return false;
01294 if (status == 1)
01295 return true;
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311 if (PING_RETRY_SCHEDULE[i] > 0)
01312 {
01313 ACE_Time_Value tv (0, PING_RETRY_SCHEDULE[i] * 1000);
01314 this->orb_->run (tv);
01315 }
01316 }
01317 if (debug_ > 0)
01318 {
01319 ACE_DEBUG ((LM_DEBUG,
01320 "ImR: <%s> Ping retry count exceeded. alive=maybe.\n", info.name.c_str ()));
01321 }
01322
01323
01324
01325 info.last_ping = ACE_OS::gettimeofday ();
01326 return true;
01327 }
01328
01329 int
01330 ImR_Locator_i::is_alive_i (Server_Info& info)
01331 {
01332
01333
01334 if (info.ior.length () == 0 || info.partial_ior.length () == 0)
01335 {
01336 if (debug_ > 1)
01337 {
01338 ACE_DEBUG ((LM_DEBUG,
01339 "ImR: <%s> not running. alive=false.\n", info.name.c_str ()));
01340 }
01341 info.last_ping = ACE_Time_Value::zero;
01342 return 0;
01343 }
01344
01345 if (ping_interval_ == ACE_Time_Value::zero)
01346 {
01347 if (debug_ > 1)
01348 {
01349 ACE_DEBUG ((LM_DEBUG,
01350 "ImR: <%s> Ping verification disabled. alive=true.\n", info.name.c_str ()));
01351 }
01352 return 1;
01353 }
01354
01355 if ((ACE_OS::gettimeofday () - info.last_ping) < ping_interval_)
01356 {
01357 if (debug_ > 1)
01358 {
01359 ACE_DEBUG ((LM_DEBUG,
01360 "ImR: <%s> within ping interval. alive=true.\n", info.name.c_str ()));
01361 }
01362 return 1;
01363 }
01364
01365
01366
01367
01368 if (info.cmdline.length () == 0 || ! repository_.has_activator (info.activator))
01369 {
01370 if (debug_ > 1)
01371 {
01372 ACE_DEBUG ((LM_DEBUG,
01373 "ImR: Ping verification skipped. <%s> not startable.\n", info.name.c_str ()));
01374 }
01375 return 1;
01376 }
01377
01378 connect_server (info);
01379
01380 if (CORBA::is_nil (info.server.in ()))
01381 {
01382 if (debug_ > 1)
01383 {
01384 ACE_DEBUG ((LM_DEBUG,
01385 "ImR: <%s> Could not connect. alive=false.\n", info.name.c_str ()));
01386 }
01387 return 0;
01388 }
01389
01390 try
01391 {
01392
01393 ImplementationRepository::ServerObject_var server = info.server;
01394
01395
01396 server->ping ();
01397
01398 if (debug_ > 1)
01399 {
01400 ACE_DEBUG ((LM_DEBUG,
01401 "ImR: <%s> Ping successful. alive=true\n", info.name.c_str ()));
01402 }
01403 info.last_ping = ACE_OS::gettimeofday ();
01404 }
01405 catch (const CORBA::TRANSIENT& ex)
01406 {
01407 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80;
01408 switch (ex.minor () & BITS_5_THRU_12_MASK)
01409 {
01410 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
01411 {
01412 if (debug_ > 1)
01413 {
01414 ACE_DEBUG ((LM_DEBUG,
01415 "ImR: <%s> Local TRANSIENT. alive=false.\n", info.name.c_str ()));
01416 }
01417 }
01418 info.last_ping = ACE_Time_Value::zero;
01419 return 0;
01420 case TAO_POA_DISCARDING:
01421 case TAO_POA_HOLDING:
01422 {
01423 if (debug_ > 1)
01424 {
01425 ACE_DEBUG ((LM_DEBUG,
01426 "ImR: <%s> Remote TRANSIENT. alive=maybe.\n", info.name.c_str ()));
01427 }
01428 }
01429 return -1;
01430
01431
01432 default:
01433 {
01434 if (debug_ > 1)
01435 {
01436 ACE_DEBUG ((LM_DEBUG,
01437 "ImR: <%s> TRANSIENT exception. alive=false.\n", info.name.c_str ()));
01438 }
01439 info.last_ping = ACE_Time_Value::zero;
01440 }
01441 return 0;
01442 }
01443 }
01444 catch (const CORBA::TIMEOUT&)
01445 {
01446 if (debug_ > 1)
01447 {
01448 ACE_DEBUG ((LM_DEBUG,
01449 "ImR: <%s> Ping timed out. alive=true.\n", info.name.c_str ()));
01450 }
01451 return 1;
01452
01453
01454
01455 }
01456 catch (const CORBA::Exception& ex)
01457 {
01458 if (debug_ > 1)
01459 {
01460 ACE_DEBUG ((LM_DEBUG, "ImR: <%s> Unexpected Ping exception. alive=false\n", info.name.c_str ()));
01461 ex._tao_print_exception ("\n");
01462 }
01463 info.last_ping = ACE_Time_Value::zero;
01464 return false;
01465 }
01466 return 1;
01467 }
01468
01469 int
01470 ImR_Locator_i::debug () const
01471 {
01472 return debug_;
01473 }