00001
00002
00003 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00004
00005 #include "tao/debug.h"
00006 #include "ace/OS_NS_string.h"
00007
00008
00009 #ifndef DEBUG_LEVEL
00010 # define DEBUG_LEVEL TAO_debug_level
00011 #endif //DEBUG_LEVEL
00012
00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace TAO_Notify
00016 {
00017
00018 Persistent_Callback::~Persistent_Callback()
00019 {
00020 }
00021
00022 Persistent_Storage_Block::Persistent_Storage_Block(const size_t block_number,
00023 const size_t block_size)
00024 : block_number_(block_number)
00025 , no_write_(false)
00026 , sync_(false)
00027 , block_size_(block_size)
00028 , callback_(0)
00029 , allocator_owns_(true)
00030 {
00031 ACE_NEW(this->data_, unsigned char[this->block_size_]);
00032 ACE_OS::memset(this->data_, 0, this->block_size_);
00033
00034 }
00035
00036 Persistent_Storage_Block::Persistent_Storage_Block(
00037 const Persistent_Storage_Block& psb)
00038 : block_number_(psb.block_number_)
00039 , no_write_(psb.no_write_)
00040 , sync_(psb.sync_)
00041 , block_size_(psb.block_size_)
00042 , callback_(psb.callback_)
00043 , allocator_owns_(psb.allocator_owns_)
00044 {
00045 ACE_NEW(this->data_, unsigned char[this->block_size_]);
00046 ACE_OS::memcpy(this->data_, psb.data(), this->block_size_);
00047 }
00048
00049 Persistent_Storage_Block::~Persistent_Storage_Block()
00050 {
00051 delete [] this->data_;
00052 this->data_ = 0;
00053 }
00054
00055 void
00056 Persistent_Storage_Block::set_no_write()
00057 {
00058 this->no_write_ = true;
00059 this->reassign_data(0, true);
00060 }
00061
00062 bool
00063 Persistent_Storage_Block::get_no_write()
00064 {
00065 return this->no_write_;
00066 }
00067
00068 void
00069 Persistent_Storage_Block::set_sync()
00070 {
00071 this->sync_ = true;
00072 }
00073
00074 bool
00075 Persistent_Storage_Block::get_sync() const
00076 {
00077 return this->sync_;
00078 }
00079
00080 size_t
00081 Persistent_Storage_Block::block_number() const
00082 {
00083 return this->block_number_;
00084 }
00085
00086 unsigned char*
00087 Persistent_Storage_Block::data() const
00088 {
00089 return this->data_;
00090 }
00091
00092 void
00093 Persistent_Storage_Block::reassign_data(unsigned char* newptr,
00094 bool delete_old)
00095 {
00096 if (delete_old)
00097 {
00098 delete [] this->data_;
00099 }
00100 this->data_ = newptr;
00101 }
00102
00103 void
00104 Persistent_Storage_Block::set_callback(Persistent_Callback* callback)
00105 {
00106 this->callback_ = callback;
00107 }
00108
00109 Persistent_Callback*
00110 Persistent_Storage_Block::get_callback() const
00111 {
00112 return this->callback_;
00113 }
00114
00115 void
00116 Persistent_Storage_Block::set_allocator_owns(bool allocator_owns)
00117 {
00118 this->allocator_owns_ = allocator_owns;
00119 }
00120
00121 bool
00122 Persistent_Storage_Block::get_allocator_owns() const
00123 {
00124 return this->allocator_owns_;
00125 }
00126
00127 Persistent_File_Allocator::Persistent_File_Allocator()
00128 : pstore_()
00129 , terminate_thread_(false)
00130 , thread_active_(false)
00131 , wake_up_thread_(queue_lock_)
00132 {
00133 }
00134
00135 Persistent_File_Allocator::~Persistent_File_Allocator()
00136 {
00137 this->shutdown_thread();
00138 }
00139
00140 bool
00141 Persistent_File_Allocator::open (const ACE_TCHAR* filename,
00142 const size_t block_size)
00143 {
00144 bool file_opened = this->pstore_.open(filename, block_size);
00145 if (file_opened)
00146 {
00147 this->thread_active_ = true;
00148 this->thread_manager_.spawn(this->thr_func, this);
00149 }
00150 return file_opened;
00151 }
00152
00153 void
00154 Persistent_File_Allocator::shutdown()
00155 {
00156 this->shutdown_thread();
00157 }
00158
00159 Persistent_Storage_Block*
00160 Persistent_File_Allocator::allocate()
00161 {
00162 Persistent_Storage_Block* result = 0;
00163 size_t block_number = 0;
00164 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00165 if (!this->allocate_block(block_number))
00166 {
00167
00168
00169
00170 }
00171 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00172 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate: %d\n"),
00173 static_cast<int> (block_number)
00174 ));
00175 result = this->allocate_at(block_number);
00176 return result;
00177 }
00178
00179 Persistent_Storage_Block*
00180 Persistent_File_Allocator::allocate_at(size_t block_number)
00181 {
00182 Persistent_Storage_Block* result = 0;
00183 this->used(block_number);
00184 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00185 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate at : %d\n"),
00186 static_cast<int> (block_number)
00187 ));
00188 ACE_NEW_RETURN(result, Persistent_Storage_Block(
00189 block_number,
00190 this->block_size()),
00191 0);
00192 return result;
00193 }
00194
00195 Persistent_Storage_Block*
00196 Persistent_File_Allocator::allocate_nowrite()
00197 {
00198 Persistent_Storage_Block* result = 0;
00199 ACE_NEW_RETURN (result,
00200 Persistent_Storage_Block (static_cast<size_t> (~0), 0),
00201 0);
00202 result->set_no_write();
00203
00204 return result;
00205 }
00206
00207 void
00208 Persistent_File_Allocator::used(size_t block_number)
00209 {
00210 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00211 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00212 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::used: %d\n"),
00213 static_cast<int> (block_number)
00214 ));
00215 ACE_ASSERT (!this->free_blocks_.is_set (block_number));
00216 this->free_blocks_.set_bit(block_number, true);
00217 }
00218
00219 void
00220 Persistent_File_Allocator::free(size_t block_number)
00221 {
00222 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00223 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::free: %d\n"),
00224 static_cast<int> (block_number)
00225 ));
00226 ACE_ASSERT (this->free_blocks_.is_set (block_number));
00227 this->free_block(block_number);
00228 }
00229
00230 size_t
00231 Persistent_File_Allocator::block_size() const
00232 {
00233 return pstore_.block_size();
00234 }
00235
00236 bool
00237 Persistent_File_Allocator::read(Persistent_Storage_Block* psb)
00238 {
00239 bool result = this->thread_active_;
00240 bool cached = false;
00241 if (result)
00242 {
00243 Persistent_Storage_Block** psbtemp = 0;
00244 {
00245 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00246 size_t queue_size = this->block_queue_.size();
00247 for (size_t idx = 0; !cached && (idx < queue_size); ++idx)
00248 {
00249
00250 size_t actual_block = (queue_size - idx) - 1;
00251 if ((0 == this->block_queue_.get(psbtemp, actual_block))
00252 && (psbtemp != 0))
00253 {
00254 cached = ((*psbtemp)->block_number() == psb->block_number());
00255 }
00256 }
00257
00258 if (cached && (0 != psbtemp))
00259 {
00260 ACE_OS::memcpy(psb->data(), (*psbtemp)->data(), this->block_size());
00261 }
00262 }
00263 if (!cached)
00264 {
00265 result = pstore_.read(psb->block_number(), psb->data());
00266 }
00267 }
00268 return result;
00269 }
00270
00271 bool
00272 Persistent_File_Allocator::write(Persistent_Storage_Block* psb)
00273 {
00274 bool result = this->thread_active_;
00275 if (result)
00276 {
00277 Persistent_Storage_Block* ourpsb = psb;
00278 if (!psb->get_allocator_owns())
00279 {
00280 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00281 ACE_TEXT ("(%P|%t) Copy PSB %d\n")
00282 , static_cast<int> (psb->block_number ())
00283 ));
00284 ACE_NEW_RETURN(ourpsb, Persistent_Storage_Block(*psb), false);
00285 ourpsb->set_allocator_owns(true);
00286 }
00287 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00288 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00289 ACE_TEXT ("(%P|%t) Queueing PSB to write block %d\n")
00290 , static_cast<int> (psb->block_number ())
00291 ));
00292 result = (0 == this->block_queue_.enqueue_tail(ourpsb));
00293 this->wake_up_thread_.signal();
00294 }
00295 return result;
00296 }
00297
00298 void
00299 Persistent_File_Allocator::free_block(const size_t block_number)
00300 {
00301 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00302 ACE_ASSERT (this->free_blocks_.is_set (block_number));
00303 this->free_blocks_.set_bit(block_number, false);
00304 }
00305
00306 bool
00307 Persistent_File_Allocator::allocate_block(size_t& block_number)
00308 {
00309 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_, 0);
00310 block_number = this->free_blocks_.find_first_bit(false);
00311 return true;
00312 }
00313
00314 ACE_THR_FUNC_RETURN
00315 Persistent_File_Allocator::thr_func(void * arg)
00316 {
00317 Persistent_File_Allocator* pfa = static_cast<Persistent_File_Allocator*> (arg);
00318 pfa->run();
00319 return 0;
00320 }
00321
00322 ACE_OFF_T
00323 Persistent_File_Allocator::file_size () const
00324 {
00325 return this->pstore_.size ();
00326 }
00327
00328 void
00329 Persistent_File_Allocator::shutdown_thread()
00330 {
00331 if (this->thread_active_)
00332 {
00333 {
00334 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00335 this->terminate_thread_ = true;
00336 this->wake_up_thread_.signal();
00337 }
00338 this->thread_manager_.close();
00339 ACE_ASSERT (!this->terminate_thread_);
00340 ACE_ASSERT (!this->thread_active_);
00341 }
00342 }
00343
00344 void
00345 Persistent_File_Allocator::run()
00346 {
00347
00348
00349 bool do_more_work = true;
00350 while (do_more_work)
00351 {
00352 do_more_work = false;
00353 Persistent_Storage_Block * blk = 0;
00354 {
00355 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00356 while (this->block_queue_.is_empty() && !terminate_thread_)
00357 {
00358 this->wake_up_thread_.wait();
00359 }
00360
00361 Persistent_Storage_Block ** pblk = 0;
00362 if (0 == this->block_queue_.get(pblk))
00363 {
00364 do_more_work = true;
00365 blk = *pblk;
00366 }
00367 }
00368 if (0 != blk)
00369 {
00370 Persistent_Callback *callback = blk->get_callback();
00371 if (!blk->get_no_write())
00372 {
00373 pstore_.write(blk->block_number(), blk->data(), blk->get_sync());
00374 }
00375 {
00376 Persistent_Storage_Block * blk2 = 0;
00377 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00378 this->block_queue_.dequeue_head (blk2);
00379
00380
00381 ACE_ASSERT (blk2 == blk);
00382 }
00383
00384 if (blk->get_allocator_owns())
00385 {
00386 delete blk;
00387 blk = 0;
00388 }
00389 if (0 != callback)
00390 {
00391 callback->persist_complete();
00392 }
00393 }
00394 }
00395 this->terminate_thread_ = false;
00396 this->thread_active_ = false;
00397 }
00398
00399 }
00400
00401 TAO_END_VERSIONED_NAMESPACE_DECL