00001
00002 #include "tao/Transport_Cache_Manager.h"
00003 #include "tao/Transport.h"
00004 #include "tao/debug.h"
00005 #include "tao/ORB_Core.h"
00006 #include "tao/Connection_Purging_Strategy.h"
00007 #include "tao/Client_Strategy_Factory.h"
00008 #include "tao/Condition.h"
00009 #include "tao/Wait_Strategy.h"
00010 #include "ace/ACE.h"
00011 #include "ace/Reactor.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 # include "tao/Transport_Cache_Manager.inl"
00015 #endif
00016
00017
00018 ACE_RCSID (tao,
00019 Transport_Cache_Manager,
00020 "$Id: Transport_Cache_Manager.cpp 79388 2007-08-17 16:05:00Z wilsond $")
00021
00022
00023 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 namespace TAO
00026 {
00027 Transport_Cache_Manager::Transport_Cache_Manager (TAO_ORB_Core &orb_core)
00028 : percent_ (orb_core.resource_factory ()->purge_percentage ())
00029 , purging_strategy_ (orb_core.resource_factory ()->create_purging_strategy ())
00030 , cache_map_ (orb_core.resource_factory ()->cache_maximum ())
00031 , condition_ (0)
00032 , cache_lock_ (0)
00033 , muxed_number_ (orb_core.resource_factory ()->max_muxed_connections ())
00034 , no_waiting_threads_ (0)
00035 , last_entry_returned_ (0)
00036 {
00037 if (orb_core.resource_factory ()->locked_transport_cache ())
00038 {
00039 ACE_NEW (this->condition_,
00040 TAO_Condition <TAO_SYNCH_MUTEX>);
00041
00042 ACE_NEW (this->cache_lock_,
00043 ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (*this->condition_->mutex ()));
00044 }
00045 else
00046 {
00047
00048
00049
00050 this->muxed_number_ = 0;
00051 ACE_NEW (this->cache_lock_,
00052 ACE_Lock_Adapter<ACE_SYNCH_NULL_MUTEX>);
00053 }
00054 }
00055
00056 Transport_Cache_Manager::~Transport_Cache_Manager (void)
00057 {
00058
00059 if (this->no_waiting_threads_)
00060 {
00061 this->condition_->broadcast ();
00062 }
00063
00064
00065 if (this->cache_lock_)
00066 {
00067 delete this->cache_lock_;
00068 this->cache_lock_ = 0;
00069 }
00070
00071
00072 if (this->purging_strategy_)
00073 {
00074 delete this->purging_strategy_;
00075 this->purging_strategy_ = 0;
00076 }
00077
00078
00079 if (this->condition_)
00080 {
00081 delete this->condition_;
00082 this->condition_ = 0;
00083 }
00084 }
00085
00086
00087 int
00088 Transport_Cache_Manager::bind_i (Cache_ExtId &ext_id,
00089 Cache_IntId &int_id)
00090 {
00091 if (TAO_debug_level > 0)
00092 {
00093 ACE_DEBUG ((LM_INFO,
00094 ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::bind_i: ")
00095 ACE_TEXT ("Transport[%d] ;hash %d\n"),
00096 int_id.transport ()->id (),
00097 ext_id.hash ()
00098 ));
00099 }
00100
00101
00102 HASH_MAP_ENTRY *entry = 0;
00103
00104
00105
00106 this->purging_strategy_->update_item (int_id.transport ());
00107
00108 int retval = this->cache_map_.bind (ext_id,
00109 int_id,
00110 entry);
00111 if (retval == 0)
00112 {
00113
00114
00115 int_id.transport ()->cache_map_entry (entry);
00116 }
00117 else if (retval == 1)
00118 {
00119 if (TAO_debug_level > 4)
00120 {
00121 ACE_DEBUG ((LM_DEBUG,
00122 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00123 "unable to bind in the first attempt. "
00124 "Trying with a new index\n"));
00125 }
00126
00127
00128
00129 retval = this->get_last_index_bind (ext_id,
00130 int_id,
00131 entry);
00132 if (retval == 0)
00133 {
00134 int_id.transport ()->cache_map_entry (entry);
00135 }
00136 }
00137
00138 if (TAO_debug_level > 5 && retval != 0)
00139 {
00140 ACE_ERROR ((LM_ERROR,
00141 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00142 "unable to bind\n"));
00143 }
00144 else if (TAO_debug_level > 3)
00145 {
00146 ACE_DEBUG ((LM_DEBUG,
00147 "TAO (%P|%t) - Transport_Cache_Manager::bind_i, "
00148 "cache size is [%d]\n",
00149 this->current_size ()));
00150 }
00151
00152 return retval;
00153 }
00154
00155 int
00156 Transport_Cache_Manager::find_transport (
00157 TAO_Transport_Descriptor_Interface *prop,
00158 TAO_Transport *&transport)
00159 {
00160 if (prop == 0)
00161 {
00162 transport = 0;
00163 return -1;
00164 }
00165
00166
00167 Cache_ExtId ext_id (prop);
00168 Cache_IntId int_id;
00169
00170 int const retval = this->find (ext_id,
00171 int_id);
00172 if (retval == 0)
00173 {
00174 transport = int_id.relinquish_transport ();
00175
00176 if (transport->wait_strategy ()->non_blocking () == 0 &&
00177 transport->orb_core ()->client_factory ()->use_cleanup_options ())
00178 {
00179 ACE_Event_Handler * const eh =
00180 transport->event_handler_i ();
00181
00182 ACE_Reactor * const r =
00183 transport->orb_core ()->reactor ();
00184
00185 if (eh &&
00186 r->remove_handler (eh,
00187 ACE_Event_Handler::READ_MASK |
00188 ACE_Event_Handler::DONT_CALL) == -1)
00189 {
00190 if (TAO_debug_level > 0)
00191 ACE_ERROR ((LM_ERROR,
00192 ACE_TEXT ("TAO (%P|%t) - TAO_Transport_Cache_Manager[%d]")
00193 ACE_TEXT ("::find_transport, remove_handler failed \n"),
00194 transport->id ()));
00195 }
00196 else
00197 {
00198 transport->wait_strategy ()->is_registered (false);
00199 }
00200 }
00201 }
00202
00203 return retval;
00204 }
00205
00206 int
00207 Transport_Cache_Manager::find (const Cache_ExtId &key,
00208 Cache_IntId &value)
00209 {
00210 ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00211 guard,
00212 *this->cache_lock_,
00213 -1));
00214
00215 int const status = this->find_i (key,
00216 value);
00217
00218 if (status == 0)
00219 {
00220
00221
00222 this->purging_strategy_->update_item (value.transport ());
00223 }
00224
00225 return status;
00226 }
00227
00228 int
00229 Transport_Cache_Manager::find_i (const Cache_ExtId &key,
00230 Cache_IntId &value)
00231 {
00232 HASH_MAP_ENTRY *entry = 0;
00233
00234
00235 int retval = 0;
00236
00237
00238 Cache_ExtId tmp_key (key.property ());
00239
00240 while (retval == 0)
00241 {
00242
00243 this->wait_for_connection (tmp_key);
00244
00245
00246 retval = this->cache_map_.find (tmp_key,
00247 entry);
00248
00249
00250 if (entry)
00251 {
00252 CORBA::Boolean idle =
00253 this->is_entry_idle (entry);
00254
00255 if (idle)
00256 {
00257
00258
00259 entry->int_id_.recycle_state (ENTRY_BUSY);
00260
00261
00262
00263
00264 value = entry->int_id_;
00265
00266 if (TAO_debug_level > 4)
00267 {
00268 ACE_DEBUG ((LM_DEBUG,
00269 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ")
00270 ACE_TEXT("at index %d (Transport[%d]) - idle\n"),
00271 entry->ext_id_.index (),
00272 entry->int_id_.transport ()->id ()));
00273 }
00274
00275 return 0;
00276 }
00277 else if (TAO_debug_level > 6)
00278 {
00279 ACE_DEBUG ((LM_DEBUG,
00280 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::find_i, ")
00281 ACE_TEXT("at index %d (Transport[%d]) - not idle\n"),
00282 entry->ext_id_.index (),
00283 entry->int_id_.transport ()->id ()));
00284 }
00285 }
00286
00287
00288 tmp_key.incr_index ();
00289 }
00290
00291
00292 if (TAO_debug_level > 4 && retval != 0)
00293 {
00294 ACE_ERROR ((LM_ERROR,
00295 "TAO (%P|%t) - Transport_Cache_Manager::find_i, "
00296 "no idle transport is available\n"));
00297 }
00298
00299 return retval;
00300 }
00301
00302 int
00303 Transport_Cache_Manager::make_idle_i (HASH_MAP_ENTRY *&entry)
00304 {
00305 if (entry == 0)
00306 return -1;
00307
00308 entry->int_id_.recycle_state (ENTRY_IDLE_AND_PURGABLE);
00309
00310
00311 if (this->no_waiting_threads_)
00312 {
00313
00314 this->last_entry_returned_ = &entry->ext_id_;
00315
00316
00317 this->condition_->signal ();
00318 }
00319
00320 return 0;
00321 }
00322
00323 int
00324 Transport_Cache_Manager::update_entry (HASH_MAP_ENTRY *&entry)
00325 {
00326 if(entry == 0)
00327 return -1;
00328
00329 ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00330 guard,
00331 *this->cache_lock_, -1));
00332
00333 if (entry == 0)
00334 return -1;
00335
00336 TAO_Connection_Purging_Strategy *st = this->purging_strategy_;
00337 (void) st->update_item (entry->int_id_.transport ());
00338
00339 return 0;
00340 }
00341
00342 int
00343 Transport_Cache_Manager::close_i (Connection_Handler_Set &handlers)
00344 {
00345 HASH_MAP_ITER end_iter = this->cache_map_.end ();
00346
00347 for (HASH_MAP_ITER iter = this->cache_map_.begin ();
00348 iter != end_iter;
00349 ++iter)
00350 {
00351
00352 (*iter).int_id_.transport ()->provide_handler (handlers);
00353
00354
00355
00356
00357
00358 (*iter).int_id_.transport ()->cache_map_entry (0);
00359 }
00360
00361
00362 this->cache_map_.unbind_all ();
00363
00364 return 0;
00365 }
00366
00367 bool
00368 Transport_Cache_Manager::blockable_client_transports_i (
00369 Connection_Handler_Set &h)
00370 {
00371 HASH_MAP_ITER end_iter = this->cache_map_.end ();
00372
00373 for (HASH_MAP_ITER iter = this->cache_map_.begin ();
00374 iter != end_iter;
00375 ++iter)
00376 {
00377
00378
00379 bool retval =
00380 (*iter).int_id_.transport ()->provide_blockable_handler (h);
00381
00382
00383
00384 if (retval)
00385 (*iter).int_id_.recycle_state (ENTRY_CLOSED);
00386 }
00387
00388 return true;
00389 }
00390
00391 int
00392 Transport_Cache_Manager::purge_entry_i (HASH_MAP_ENTRY *&entry)
00393 {
00394 if (entry == 0)
00395 {
00396 return 0;
00397 }
00398
00399
00400 int retval = this->cache_map_.unbind (entry);
00401
00402
00403 entry = 0;
00404
00405 return retval;
00406 }
00407
00408 void
00409 Transport_Cache_Manager::mark_invalid_i (HASH_MAP_ENTRY *&entry)
00410 {
00411 if (entry == 0)
00412 {
00413 return;
00414 }
00415
00416
00417 entry->int_id_.recycle_state (ENTRY_PURGABLE_BUT_NOT_IDLE);
00418 }
00419
00420 int
00421 Transport_Cache_Manager::get_last_index_bind (Cache_ExtId &key,
00422 Cache_IntId &val,
00423 HASH_MAP_ENTRY *&entry)
00424 {
00425 CORBA::ULong ctr = entry->ext_id_.index ();
00426 int retval = 0;
00427
00428 while (retval == 0)
00429 {
00430
00431 key.index (++ctr);
00432
00433
00434
00435 retval = this->cache_map_.find (key);
00436 }
00437
00438
00439 return this->cache_map_.bind (key,
00440 val,
00441 entry);
00442 }
00443
00444
00445 bool
00446 Transport_Cache_Manager::is_entry_idle (HASH_MAP_ENTRY *&entry)
00447 {
00448 Cache_Entries_State entry_state =
00449 entry->int_id_.recycle_state ();
00450
00451 if (TAO_debug_level)
00452 {
00453 ACE_DEBUG ((LM_DEBUG,
00454 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::is_entry_idle, ")
00455 ACE_TEXT("state is [%d]\n"),
00456 entry_state));
00457 }
00458
00459 if (entry_state == ENTRY_IDLE_AND_PURGABLE ||
00460 entry_state == ENTRY_IDLE_BUT_NOT_PURGABLE)
00461 return true;
00462
00463 return false;
00464 }
00465
00466 #if !defined (ACE_LACKS_QSORT)
00467 int
00468 Transport_Cache_Manager::cpscmp(const void* a, const void* b)
00469 {
00470 const HASH_MAP_ENTRY** left = (const HASH_MAP_ENTRY**)a;
00471 const HASH_MAP_ENTRY** right = (const HASH_MAP_ENTRY**)b;
00472
00473 if ((*left)->int_id_.transport ()->purging_order () <
00474 (*right)->int_id_.transport ()->purging_order ())
00475 return -1;
00476
00477 if ((*left)->int_id_.transport ()->purging_order () >
00478 (*right)->int_id_.transport ()->purging_order ())
00479 return 1;
00480
00481 return 0;
00482 }
00483 #endif
00484
00485 int
00486 Transport_Cache_Manager::purge (void)
00487 {
00488 ACE_Unbounded_Stack<TAO_Transport*> transports_to_be_closed;
00489
00490 {
00491 ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->cache_lock_, 0));
00492
00493 DESCRIPTOR_SET sorted_set = 0;
00494 int sorted_size = this->fill_set_i (sorted_set);
00495
00496
00497
00498
00499 if (sorted_set != 0)
00500 {
00501
00502
00503 const int amount = (sorted_size * this->percent_) / 100;
00504
00505 if (TAO_debug_level > 0)
00506 {
00507 ACE_DEBUG ((LM_DEBUG,
00508 ACE_TEXT ("TAO (%P|%t) - Transport_Cache_Manager::")
00509 ACE_TEXT ("purge, purging %d of %d cache entries\n"),
00510 amount,
00511 sorted_size));
00512 }
00513
00514 int count = 0;
00515
00516 for (int i = 0; count < amount && i < sorted_size; ++i)
00517 {
00518 if (this->is_entry_idle (sorted_set[i]))
00519 {
00520 sorted_set[i]->int_id_.recycle_state (ENTRY_BUSY);
00521
00522 TAO_Transport* transport =
00523 sorted_set[i]->int_id_.transport ();
00524 transport->add_reference ();
00525
00526 if (transports_to_be_closed.push (transport) != 0)
00527 {
00528 ACE_DEBUG ((LM_INFO,
00529 ACE_TEXT ("TAO (%P|%t) - ")
00530 ACE_TEXT ("Unable to push transport %u ")
00531 ACE_TEXT ("on the to-be-closed stack, so ")
00532 ACE_TEXT ("it will leak\n"),
00533 transport->id ()));
00534 }
00535
00536 if (TAO_debug_level > 0)
00537 {
00538 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ")
00539 ACE_TEXT ("Idle transport found in ")
00540 ACE_TEXT ("cache: [%d] \n"),
00541 transport->id ()));
00542 }
00543
00544
00545 count++;
00546 }
00547 }
00548
00549 delete [] sorted_set;
00550 sorted_set = 0;
00551
00552 }
00553 }
00554
00555
00556 TAO_Transport *transport = 0;
00557
00558 while (! transports_to_be_closed.is_empty ())
00559 {
00560 if (transports_to_be_closed.pop (transport) == 0)
00561 {
00562 if (transport)
00563 {
00564 transport->close_connection ();
00565 transport->remove_reference ();
00566 }
00567 }
00568 }
00569
00570 return 0;
00571 }
00572
00573
00574 void
00575 Transport_Cache_Manager::sort_set (DESCRIPTOR_SET& entries,
00576 int current_size)
00577 {
00578 #if defined (ACE_LACKS_QSORT)
00579
00580 for(int i = 1; i < current_size; ++i)
00581 {
00582 if (entries[i]->int_id_.transport ()->purging_order () <
00583 entries[i - 1]->int_id_.transport ()->purging_order ())
00584 {
00585 HASH_MAP_ENTRY* entry = entries[i];
00586
00587 for(int j = i; j > 0 &&
00588 entries[j - 1]->int_id_.transport ()->purging_order () >
00589 entry->int_id_.transport ()->purging_order (); --j)
00590 {
00591 HASH_MAP_ENTRY* holder = entries[j];
00592 entries[j] = entries[j - 1];
00593 entries[j - 1] = holder;
00594 }
00595 }
00596 }
00597 #else
00598 ACE_OS::qsort (entries, current_size,
00599 sizeof (HASH_MAP_ENTRY*), (ACE_COMPARE_FUNC)cpscmp);
00600 #endif
00601 }
00602
00603
00604 int
00605 Transport_Cache_Manager::fill_set_i (DESCRIPTOR_SET& sorted_set)
00606 {
00607 int current_size = 0;
00608 int cache_maximum = this->purging_strategy_->cache_maximum ();
00609
00610
00611 sorted_set = 0;
00612
00613
00614 if (cache_maximum >= 0)
00615 {
00616 current_size = static_cast<int> (this->cache_map_.current_size ());
00617
00618 if (TAO_debug_level > 0)
00619 {
00620 ACE_DEBUG ((LM_DEBUG,
00621 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::fill_set_i, ")
00622 ACE_TEXT("current_size = %d, cache_maximum = %d\n"),
00623 current_size, cache_maximum));
00624 }
00625
00626 if (current_size >= cache_maximum)
00627 {
00628 ACE_NEW_RETURN (sorted_set, HASH_MAP_ENTRY*[current_size], 0);
00629
00630 HASH_MAP_ITER iter = this->cache_map_.begin ();
00631
00632 for (int i = 0; i < current_size; ++i)
00633 {
00634 sorted_set[i] = &(*iter);
00635 iter++;
00636 }
00637
00638 this->sort_set (sorted_set, current_size);
00639 }
00640 }
00641
00642 return current_size;
00643 }
00644
00645
00646 int
00647 Transport_Cache_Manager::wait_for_connection (Cache_ExtId &extid)
00648 {
00649 if (this->muxed_number_ && this->muxed_number_ == extid.index ())
00650 {
00651
00652
00653 ++this->no_waiting_threads_;
00654
00655 if (TAO_debug_level > 2)
00656 {
00657 ACE_DEBUG ((LM_DEBUG,
00658 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager")
00659 ACE_TEXT("::wait_for_connection, ")
00660 ACE_TEXT("entering wait loop\n")));
00661 }
00662
00663 int ready_togo = 0;
00664
00665 while (ready_togo == 0)
00666 {
00667 this->condition_->wait ();
00668
00669
00670 ready_togo = this->is_wakeup_useful (extid);
00671 }
00672
00673 if (TAO_debug_level > 2)
00674 {
00675 ACE_DEBUG ((LM_DEBUG,
00676 ACE_TEXT("TAO (%P|%t) - Transport_Cache_Manager::wait_for_connection, ")
00677 ACE_TEXT("left wait loop\n")));
00678 }
00679
00680
00681 --this->no_waiting_threads_;
00682 }
00683
00684 return 0;
00685 }
00686
00687 int
00688 Transport_Cache_Manager::is_wakeup_useful (Cache_ExtId &extid)
00689 {
00690
00691 TAO_Transport_Descriptor_Interface *prop = extid.property ();
00692
00693
00694
00695
00696 if (this->last_entry_returned_ &&
00697 prop->is_equivalent (this->last_entry_returned_->property ()))
00698 {
00699
00700
00701 extid.index (this->last_entry_returned_->index ());
00702
00703
00704 this->last_entry_returned_ = 0;
00705
00706 return 1;
00707 }
00708
00709
00710
00711
00712 if (this->last_entry_returned_ &&
00713 this->no_waiting_threads_ > 1)
00714 {
00715 this->condition_->signal ();
00716 }
00717
00718 return 0;
00719 }
00720
00721 }
00722
00723 TAO_END_VERSIONED_NAMESPACE_DECL