00001
00002 #include "tao/Transport_Cache_Manager.h"
00003 #include "tao/Transport.h"
00004 #include "tao/debug.h"
00005 #include "tao/ORB_Core.h"
00006 #include "tao/Connection_Purging_Strategy.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Condition.h"
00009 #include "tao/Wait_Strategy.h"
00010 #include "ace/ACE.h"
00011 #include "ace/Reactor.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 # include "tao/Transport_Cache_Manager.inl"
00015 #endif
00016
00017 ACE_RCSID (tao,
00018 Transport_Cache_Manager,
00019 "$Id: Transport_Cache_Manager.cpp 81691 2008-05-14 11:09:21Z johnnyw $")
00020
00021 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 namespace TAO
00024 {
00025 Transport_Cache_Manager::Transport_Cache_Manager (TAO_ORB_Core &orb_core)
00026 : percent_ (orb_core.resource_factory ()->purge_percentage ())
00027 , purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ())
00028 , cache_map_ (orb_core.resource_factory ()->cache_maximum ())
00029 , condition_ (0)
00030 , cache_lock_ (0)
00031 , muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ())
00032 , no_waiting_threads_ (0)
00033 , last_entry_returned_ (0)
00034 {
00035 if (orb_core.resource_factory ()->locked_transport_cache ())
00036 {
00037 ACE_NEW (this->condition_,
00038 TAO_Condition <TAO_SYNCH_MUTEX>);
00039
00040 ACE_NEW (this->cache_lock_,
00041 ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ()));
00042 }
00043 else
00044 {
00045
00046
00047
00048 this->muxed_number_ = 0;
00049 ACE_NEW (this->cache_lock_,
00050 ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>);
00051 }
00052
00053 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00054 ACE_NEW (this->purge_monitor_,
00055 ACE::Monitor_Control::Size_Monitor ("Connection_Cache_Purge"));
00056 ACE_NEW (this->size_monitor_,
00057 ACE::Monitor_Control::Size_Monitor ("Connection_Cache_Size"));
00058 this->purge_monitor_->add_to_registry ();
00059 this->size_monitor_->add_to_registry ();
00060 #endif
00061 }
00062
00063 Transport_Cache_Manager::~Transport_Cache_Manager (void)
00064 {
00065
00066 if (this->no_waiting_threads_)
00067 {
00068 this->condition_->broadcast ();
00069 }
00070
00071
00072 if (this->cache_lock_)
00073 {
00074 delete this->cache_lock_;
00075 this->cache_lock_ = 0;
00076 }
00077
00078
00079 if (this->purging_strategy_)
00080 {
00081 delete this->purging_strategy_;
00082 this->purging_strategy_ = 0;
00083 }
00084
00085
00086 if (this->condition_)
00087 {
00088 delete this->condition_;
00089 this->condition_ = 0;
00090 }
00091
00092 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00093 this->purge_monitor_->remove_from_registry ();
00094 this->size_monitor_->remove_from_registry ();
00095 this->purge_monitor_->remove_ref ();
00096 this->size_monitor_->remove_ref ();
00097 #endif
00098 }
00099
00100
00101 int
00102 Transport_Cache_Manager::bind_i (Cache_ExtId &ext_id,
00103 Cache_IntId &int_id)
00104 {
00105 if (TAO_debug_level > 0)
00106 {
00107 ACE_DEBUG ((LM_INFO,
00108 ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i, ")
00109 ACE_TEXT ("Transport[%d]; hash %d\n"),
00110 int_id.transport ()->id (),
00111 ext_id.hash ()
00112 ));
00113 }
00114
00115
00116 HASH_MAP_ENTRY *entry = 0;
00117
00118
00119
00120 this->purging_strategy_->update_item (int_id.transport ());
00121
00122 int retval = this->cache_map_.bind (ext_id, int_id, entry);
00123 if (retval == 0)
00124 {
00125
00126
00127 int_id.transport ()->cache_map_entry (entry);
00128 }
00129 else if (retval == 1)
00130 {
00131 if (TAO_debug_level > 4)
00132 {
00133 ACE_DEBUG ((LM_DEBUG,
00134 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00135 "unable to bind in the first attempt. "
00136 "Trying with a new index\n"));
00137 }
00138
00139
00140
00141 retval = this->get_last_index_bind (ext_id, int_id, entry);
00142 if (retval == 0)
00143 {
00144 int_id.transport ()->cache_map_entry (entry);
00145 }
00146 }
00147
00148 if (TAO_debug_level > 5 && retval != 0)
00149 {
00150 ACE_ERROR ((LM_ERROR,
00151 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00152 "unable to bind\n"));
00153 }
00154 else if (TAO_debug_level > 3)
00155 {
00156 ACE_DEBUG ((LM_DEBUG,
00157 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00158 "cache size is [%d]\n",
00159 this->current_size ()));
00160 }
00161
00162 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00163 this->size_monitor_->receive (this->current_size ());
00164 #endif
00165
00166 return retval;
00167 }
00168
00169 int
00170 Transport_Cache_Manager::find_transport (
00171 TAO_Transport_Descriptor_Interface *prop,
00172 TAO_Transport *&transport)
00173 {
00174 if (prop == 0)
00175 {
00176 transport = 0;
00177 return -1;
00178 }
00179
00180
00181 Cache_ExtId ext_id (prop);
00182 Cache_IntId int_id;
00183
00184 int const retval = this->find (ext_id, int_id);
00185 if (retval == 0)
00186 {
00187 transport = int_id.relinquish_transport ();
00188
00189 if (transport->wait_strategy ()->non_blocking () == 0 &&
00190 transport->orb_core ()->client_factory ()->use_cleanup_options ())
00191 {
00192 ACE_Event_Handler * const eh = transport->event_handler_i ();
00193 ACE_Reactor * const r = transport->orb_core ()->reactor ();
00194
00195 if (eh &&
00196 r->remove_handler (eh,
00197 ACE_Event_Handler::READ_MASK |
00198 ACE_Event_Handler::DONT_CALL) == -1)
00199 {
00200 if (TAO_debug_level > 0)
00201 ACE_ERROR ((LM_ERROR,
00202 ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]")
00203 ACE_TEXT ("::find_transport, remove_handler failed \n"),
00204 transport->id ()));
00205 }
00206 else
00207 {
00208 transport->wait_strategy ()->is_registered (false);
00209 }
00210 }
00211 }
00212
00213 return retval;
00214 }
00215
00216 int
00217 Transport_Cache_Manager::find (const Cache_ExtId &key,
00218 Cache_IntId &value)
00219 {
00220 ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00221 guard,
00222 *this->cache_lock_,
00223 -1));
00224
00225 int const status = this->find_i (key, value);
00226
00227 if (status == 0)
00228 {
00229
00230
00231 this->purging_strategy_->update_item (value.transport ());
00232 }
00233
00234 return status;
00235 }
00236
00237 int
00238 Transport_Cache_Manager::find_i (const Cache_ExtId &key,
00239 Cache_IntId &value)
00240 {
00241 HASH_MAP_ENTRY *entry = 0;
00242
00243
00244 int retval = 0;
00245
00246
00247 Cache_ExtId tmp_key (key.property ());
00248
00249 while (retval == 0)
00250 {
00251
00252 this->wait_for_connection (tmp_key);
00253
00254
00255 retval = this->cache_map_.find (tmp_key, entry);
00256
00257
00258 if (entry)
00259 {
00260 CORBA::Boolean const idle = this->is_entry_idle (entry);
00261
00262 if (idle)
00263 {
00264
00265
00266 entry->int_id_.recycle_state (ENTRY_BUSY);
00267
00268
00269
00270
00271 value = entry->int_id_;
00272
00273 if (TAO_debug_level > 4)
00274 {
00275 ACE_DEBUG ((LM_DEBUG,
00276 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ")
00277 ACE_TEXT("at index %d (Transport[%d]) - idle\n"),
00278 entry->ext_id_.index (),
00279 entry->int_id_.transport ()->id ()));
00280 }
00281
00282 return 0;
00283 }
00284 else if (TAO_debug_level > 6)
00285 {
00286 ACE_DEBUG ((LM_DEBUG,
00287 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ")
00288 ACE_TEXT("at index %d (Transport[%d]) - not idle\n"),
00289 entry->ext_id_.index (),
00290 entry->int_id_.transport ()->id ()));
00291 }
00292 }
00293
00294
00295 tmp_key.incr_index ();
00296 }
00297
00298
00299 if (TAO_debug_level > 4 && retval != 0)
00300 {
00301 ACE_ERROR ((LM_ERROR,
00302 "TAO (%P|%t) - Transport_Cache_Manager::find_i, "
00303 "no idle transport is available\n"));
00304 }
00305
00306 return retval;
00307 }
00308
00309 int
00310 Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
00311 {
00312 if (entry == 0)
00313 return -1;
00314
00315 entry->int_id_.recycle_state (ENTRY_IDLE_AND_PURGABLE);
00316
00317
00318 if (this->no_waiting_threads_)
00319 {
00320
00321 this->last_entry_returned_ = &entry->ext_id_;
00322
00323
00324 this->condition_->signal ();
00325 }
00326
00327 return 0;
00328 }
00329
00330 int
00331 Transport_Cache_Manager::update_entry (HASH_MAP_ENTRY *&entry)
00332 {
00333 if(entry == 0)
00334 return -1;
00335
00336 ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00337 guard,
00338 *this->cache_lock_, -1));
00339
00340 if (entry == 0)
00341 return -1;
00342
00343 TAO_Connection_Purging_Strategy *st = this->purging_strategy_;
00344 (void) st->update_item (entry->int_id_.transport ());
00345
00346 return 0;
00347 }
00348
00349 int
00350 Transport_Cache_Manager::close_i (Connection_Handler_Set &handlers)
00351 {
00352 HASH_MAP_ITER end_iter = this->cache_map_.end ();
00353
00354 for (HASH_MAP_ITER iter = this->cache_map_.begin ();
00355 iter != end_iter;
00356 ++iter)
00357 {
00358
00359 (*iter).int_id_.transport ()->provide_handler (handlers);
00360
00361
00362
00363
00364
00365 (*iter).int_id_.transport ()->cache_map_entry (0);
00366 }
00367
00368
00369 this->cache_map_.unbind_all ();
00370
00371 return 0;
00372 }
00373
00374 bool
00375 Transport_Cache_Manager::blockable_client_transports_i (
00376 Connection_Handler_Set &h)
00377 {
00378 HASH_MAP_ITER end_iter = this->cache_map_.end ();
00379
00380 for (HASH_MAP_ITER iter = this->cache_map_.begin ();
00381 iter != end_iter;
00382 ++iter)
00383 {
00384
00385
00386 bool const retval =
00387 (*iter).int_id_.transport ()->provide_blockable_handler (h);
00388
00389
00390
00391 if (retval)
00392 (*iter).int_id_.recycle_state (ENTRY_CLOSED);
00393 }
00394
00395 return true;
00396 }
00397
00398 int
00399 Transport_Cache_Manager::purge_entry_i (HASH_MAP_ENTRY *&entry)
00400 {
00401 if (entry == 0)
00402 {
00403 return 0;
00404 }
00405
00406
00407 int const retval = this->cache_map_.unbind (entry);
00408
00409
00410 entry = 0;
00411
00412 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00413 this->size_monitor_->receive (this->current_size ());
00414 #endif
00415
00416 return retval;
00417 }
00418
00419 void
00420 Transport_Cache_Manager::mark_invalid_i (HASH_MAP_ENTRY *&entry)
00421 {
00422 if (entry == 0)
00423 {
00424 return;
00425 }
00426
00427
00428 entry->int_id_.recycle_state (ENTRY_PURGABLE_BUT_NOT_IDLE);
00429 }
00430
00431 int
00432 Transport_Cache_Manager::get_last_index_bind (Cache_ExtId &key,
00433 Cache_IntId &val,
00434 HASH_MAP_ENTRY *&entry)
00435 {
00436 CORBA::ULong ctr = entry->ext_id_.index ();
00437 int retval = 0;
00438
00439 while (retval == 0)
00440 {
00441
00442 key.index (++ctr);
00443
00444
00445
00446 retval = this->cache_map_.find (key);
00447 }
00448
00449
00450 return this->cache_map_.bind (key, val, entry);
00451
00452 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00453 this->size_monitor_->receive (this->current_size ());
00454 #endif
00455 }
00456
00457
00458 bool
00459 Transport_Cache_Manager::is_entry_idle (HASH_MAP_ENTRY *&entry)
00460 {
00461 Cache_Entries_State entry_state =
00462 entry->int_id_.recycle_state ();
00463
00464 if (TAO_debug_level)
00465 {
00466 ACE_DEBUG ((LM_DEBUG,
00467 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_idle, ")
00468 ACE_TEXT("state is [%d]\n"),
00469 entry_state));
00470 }
00471
00472 if (entry_state == ENTRY_IDLE_AND_PURGABLE ||
00473 entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE)
00474 return true;
00475
00476 return false;
00477 }
00478
00479 #if !defined (ACE_LACKS_QSORT)
00480 int
00481 Transport_Cache_Manager::cpscmp(const void* a, const void* b)
00482 {
00483 const HASH_MAP_ENTRY** left = (const HASH_MAP_ENTRY**)a;
00484 const HASH_MAP_ENTRY** right = (const HASH_MAP_ENTRY**)b;
00485
00486 if ((*left)->int_id_.transport ()->purging_order () <
00487 (*right)->int_id_.transport ()->purging_order ())
00488 return -1;
00489
00490 if ((*left)->int_id_.transport ()->purging_order () >
00491 (*right)->int_id_.transport ()->purging_order ())
00492 return 1;
00493
00494 return 0;
00495 }
00496 #endif
00497
00498 int
00499 Transport_Cache_Manager::purge (void)
00500 {
00501 ACE_Unbounded_Stack<TAO_Transport*> transports_to_be_closed;
00502
00503 {
00504 ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0));
00505
00506 DESCRIPTOR_SET sorted_set = 0;
00507 int sorted_size = this->fill_set_i (sorted_set);
00508
00509
00510
00511
00512 if (sorted_set != 0)
00513 {
00514
00515
00516 int const amount = (sorted_size * this->percent_) / 100;
00517
00518 if (TAO_debug_level > 0)
00519 {
00520 ACE_DEBUG ((LM_DEBUG,
00521 ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::")
00522 ACE_TEXT ("purge, purging %d of %d cache entries\n"),
00523 amount,
00524 sorted_size));
00525 }
00526
00527 int count = 0;
00528
00529 for (int i = 0; count < amount && i < sorted_size; ++i)
00530 {
00531 if (this->is_entry_idle (sorted_set[i]))
00532 {
00533 sorted_set[i]->int_id_.recycle_state (ENTRY_BUSY);
00534
00535 TAO_Transport* transport =
00536 sorted_set[i]->int_id_.transport ();
00537 transport->add_reference ();
00538
00539 if (transports_to_be_closed.push (transport) != 0)
00540 {
00541 ACE_DEBUG ((LM_INFO,
00542 ACE_TEXT ("TAO (%P|%t) - ")
00543 ACE_TEXT ("Unable to push transport %u ")
00544 ACE_TEXT ("on the to-be-closed stack, so ")
00545 ACE_TEXT ("it will leak\n"),
00546 transport->id ()));
00547 }
00548
00549 if (TAO_debug_level > 0)
00550 {
00551 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ")
00552 ACE_TEXT ("Idle transport found in ")
00553 ACE_TEXT ("cache: [%d] \n"),
00554 transport->id ()));
00555 }
00556
00557
00558 ++count;
00559 }
00560 }
00561
00562 delete [] sorted_set;
00563 sorted_set = 0;
00564
00565 }
00566 }
00567
00568
00569 TAO_Transport *transport = 0;
00570
00571 while (! transports_to_be_closed.is_empty ())
00572 {
00573 if (transports_to_be_closed.pop (transport) == 0)
00574 {
00575 if (transport)
00576 {
00577 transport->close_connection ();
00578 transport->remove_reference ();
00579 }
00580 }
00581 }
00582
00583 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
00584
00585
00586 this->purge_monitor_->receive (static_cast<size_t> (0UL));
00587
00588 this->size_monitor_->receive (this->current_size ());
00589 #endif
00590
00591 return 0;
00592 }
00593
00594
00595 void
00596 Transport_Cache_Manager::sort_set (DESCRIPTOR_SET& entries,
00597 int current_size)
00598 {
00599 #if defined (ACE_LACKS_QSORT)
00600
00601 for(int i = 1; i < current_size; ++i)
00602 {
00603 if (entries[i]->int_id_.transport ()->purging_order () <
00604 entries[i - 1]->int_id_.transport ()->purging_order ())
00605 {
00606 HASH_MAP_ENTRY* entry = entries[i];
00607
00608 for(int j = i; j > 0 &&
00609 entries[j - 1]->int_id_.transport ()->purging_order () >
00610 entry->int_id_.transport ()->purging_order (); --j)
00611 {
00612 HASH_MAP_ENTRY* holder = entries[j];
00613 entries[j] = entries[j - 1];
00614 entries[j - 1] = holder;
00615 }
00616 }
00617 }
00618 #else
00619 ACE_OS::qsort (entries, current_size,
00620 sizeof (HASH_MAP_ENTRY*), (ACE_COMPARE_FUNC)cpscmp);
00621 #endif
00622 }
00623
00624
00625 int
00626 Transport_Cache_Manager::fill_set_i (DESCRIPTOR_SET& sorted_set)
00627 {
00628 int current_size = 0;
00629 int const cache_maximum = this->purging_strategy_->cache_maximum ();
00630
00631
00632 sorted_set = 0;
00633
00634
00635 if (cache_maximum >= 0)
00636 {
00637 current_size = static_cast<int> (this->cache_map_.current_size ());
00638
00639 if (TAO_debug_level > 0)
00640 {
00641 ACE_DEBUG ((LM_DEBUG,
00642 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::fill_set_i, ")
00643 ACE_TEXT("current_size = %d, cache_maximum = %d\n"),
00644 current_size, cache_maximum));
00645 }
00646
00647 if (current_size >= cache_maximum)
00648 {
00649 ACE_NEW_RETURN (sorted_set, HASH_MAP_ENTRY*[current_size], 0);
00650
00651 HASH_MAP_ITER iter = this->cache_map_.begin ();
00652
00653 for (int i = 0; i < current_size; ++i)
00654 {
00655 sorted_set[i] = &(*iter);
00656 iter++;
00657 }
00658
00659 this->sort_set (sorted_set, current_size);
00660 }
00661 }
00662
00663 return current_size;
00664 }
00665
00666
00667 int
00668 Transport_Cache_Manager::wait_for_connection (Cache_ExtId &extid)
00669 {
00670 if (this->muxed_number_ && this->muxed_number_ == extid.index ())
00671 {
00672
00673
00674 ++this->no_waiting_threads_;
00675
00676 if (TAO_debug_level > 2)
00677 {
00678 ACE_DEBUG ((LM_DEBUG,
00679 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager")
00680 ACE_TEXT("::wait_for_connection, ")
00681 ACE_TEXT("entering wait loop\n")));
00682 }
00683
00684 int ready_togo = 0;
00685
00686 while (ready_togo == 0)
00687 {
00688 this->condition_->wait ();
00689
00690
00691 ready_togo = this->is_wakeup_useful (extid);
00692 }
00693
00694 if (TAO_debug_level > 2)
00695 {
00696 ACE_DEBUG ((LM_DEBUG,
00697 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::wait_for_connection, ")
00698 ACE_TEXT("left wait loop\n")));
00699 }
00700
00701
00702 --this->no_waiting_threads_;
00703 }
00704
00705 return 0;
00706 }
00707
00708 int
00709 Transport_Cache_Manager::is_wakeup_useful (Cache_ExtId &extid)
00710 {
00711
00712 TAO_Transport_Descriptor_Interface *prop = extid.property ();
00713
00714
00715
00716
00717 if (this->last_entry_returned_ &&
00718 prop->is_equivalent (this->last_entry_returned_->property ()))
00719 {
00720
00721
00722 extid.index (this->last_entry_returned_->index ());
00723
00724
00725 this->last_entry_returned_ = 0;
00726
00727 return 1;
00728 }
00729
00730
00731
00732
00733 if (this->last_entry_returned_ &&
00734 this->no_waiting_threads_ > 1)
00735 {
00736 this->condition_->signal ();
00737 }
00738
00739 return 0;
00740 }
00741
00742 }
00743
00744 TAO_END_VERSIONED_NAMESPACE_DECL