00001
00002
00003 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00004
00005 #include "tao/debug.h"
00006 #include "ace/OS_NS_string.h"
00007
00008
00009 #ifndef DEBUG_LEVEL
00010 # define DEBUG_LEVEL TAO_debug_level
00011 #endif //DEBUG_LEVEL
00012
00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace TAO_Notify
00016 {
00017
00018 Persistent_Callback::~Persistent_Callback()
00019 {
00020 }
00021
00022 Persistent_Storage_Block::Persistent_Storage_Block(const size_t block_number,
00023 const size_t block_size)
00024 : block_number_(block_number)
00025 , no_write_(false)
00026 , sync_(false)
00027 , block_size_(block_size)
00028 , callback_(0)
00029 , allocator_owns_(true)
00030 {
00031 ACE_NEW(this->data_, unsigned char[this->block_size_]);
00032 ACE_OS::memset(this->data_, 0, this->block_size_);
00033
00034 }
00035
00036 Persistent_Storage_Block::Persistent_Storage_Block(
00037 const Persistent_Storage_Block& psb)
00038 : block_number_(psb.block_number_)
00039 , no_write_(psb.no_write_)
00040 , sync_(psb.sync_)
00041 , block_size_(psb.block_size_)
00042 , callback_(psb.callback_)
00043 , allocator_owns_(psb.allocator_owns_)
00044 {
00045 ACE_NEW(this->data_, unsigned char[this->block_size_]);
00046 ACE_OS::memcpy(this->data_, psb.data(), this->block_size_);
00047 }
00048
00049 Persistent_Storage_Block::~Persistent_Storage_Block()
00050 {
00051 delete [] this->data_;
00052 this->data_ = 0;
00053 }
00054
00055 void
00056 Persistent_Storage_Block::set_no_write()
00057 {
00058 this->no_write_ = true;
00059 this->reassign_data(0, true);
00060 }
00061
00062 bool
00063 Persistent_Storage_Block::get_no_write()
00064 {
00065 return this->no_write_;
00066 }
00067
00068 void
00069 Persistent_Storage_Block::set_sync()
00070 {
00071 this->sync_ = true;
00072 }
00073
00074 bool
00075 Persistent_Storage_Block::get_sync() const
00076 {
00077 return this->sync_;
00078 }
00079
00080 size_t
00081 Persistent_Storage_Block::block_number() const
00082 {
00083 return this->block_number_;
00084 }
00085
00086 unsigned char*
00087 Persistent_Storage_Block::data() const
00088 {
00089 return this->data_;
00090 }
00091
00092 void
00093 Persistent_Storage_Block::reassign_data(unsigned char* newptr,
00094 bool delete_old)
00095 {
00096 if (delete_old)
00097 {
00098 delete [] this->data_;
00099 }
00100 this->data_ = newptr;
00101 }
00102
00103 void
00104 Persistent_Storage_Block::set_callback(Persistent_Callback* callback)
00105 {
00106 this->callback_ = callback;
00107 }
00108
00109 Persistent_Callback*
00110 Persistent_Storage_Block::get_callback() const
00111 {
00112 return this->callback_;
00113 }
00114
00115 void
00116 Persistent_Storage_Block::set_allocator_owns(bool allocator_owns)
00117 {
00118 this->allocator_owns_ = allocator_owns;
00119 }
00120
00121 bool
00122 Persistent_Storage_Block::get_allocator_owns() const
00123 {
00124 return this->allocator_owns_;
00125 }
00126
00127 Persistent_File_Allocator::Persistent_File_Allocator()
00128 : pstore_()
00129 , terminate_thread_(false)
00130 , thread_active_(false)
00131 , wake_up_thread_(queue_lock_)
00132 {
00133 }
00134
00135 Persistent_File_Allocator::~Persistent_File_Allocator()
00136 {
00137 this->shutdown_thread();
00138 }
00139
00140 bool
00141 Persistent_File_Allocator::open (const ACE_TCHAR* filename,
00142 const size_t block_size)
00143 {
00144 bool file_opened = this->pstore_.open(filename, block_size);
00145 if (file_opened)
00146 {
00147 this->thread_active_ = true;
00148 this->thread_manager_.spawn(this->thr_func, this);
00149 }
00150 return file_opened;
00151 }
00152
00153 void
00154 Persistent_File_Allocator::shutdown()
00155 {
00156 this->shutdown_thread();
00157 }
00158
00159 Persistent_Storage_Block*
00160 Persistent_File_Allocator::allocate()
00161 {
00162 Persistent_Storage_Block* result = 0;
00163 size_t block_number = 0;
00164 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00165 if (!this->allocate_block(block_number))
00166 {
00167
00168
00169
00170 }
00171 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00172 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate: %d\n"),
00173 static_cast<int> (block_number)
00174 ));
00175 result = this->allocate_at(block_number);
00176 return result;
00177 }
00178
00179 Persistent_Storage_Block*
00180 Persistent_File_Allocator::allocate_at(size_t block_number)
00181 {
00182 Persistent_Storage_Block* result = 0;
00183 this->used(block_number);
00184 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00185 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate at : %d\n"),
00186 static_cast<int> (block_number)
00187 ));
00188 ACE_NEW_RETURN(result, Persistent_Storage_Block(
00189 block_number,
00190 this->block_size()),
00191 0);
00192 return result;
00193 }
00194
00195 Persistent_Storage_Block*
00196 Persistent_File_Allocator::allocate_nowrite()
00197 {
00198 Persistent_Storage_Block* result = 0;
00199 ACE_NEW_RETURN (result,
00200 Persistent_Storage_Block (static_cast<size_t> (~0), 0),
00201 0);
00202 result->set_no_write();
00203
00204 return result;
00205 }
00206
00207 void
00208 Persistent_File_Allocator::used(size_t block_number)
00209 {
00210 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00211 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00212 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::used: %d\n"),
00213 static_cast<int> (block_number)
00214 ));
00215 ACE_ASSERT (!this->free_blocks_.is_set (block_number));
00216 this->free_blocks_.set_bit(block_number, true);
00217 }
00218
00219 void
00220 Persistent_File_Allocator::free(size_t block_number)
00221 {
00222 if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00223 ACE_TEXT ("(%P|%t) Persistent_File_Allocator::free: %d\n"),
00224 static_cast<int> (block_number)
00225 ));
00226 ACE_ASSERT (this->free_blocks_.is_set (block_number));
00227 this->free_block(block_number);
00228 }
00229
00230 size_t
00231 Persistent_File_Allocator::block_size() const
00232 {
00233 return pstore_.block_size();
00234 }
00235
00236 bool
00237 Persistent_File_Allocator::read(Persistent_Storage_Block* psb)
00238 {
00239 bool result = this->thread_active_;
00240 bool cached = false;
00241 if (result)
00242 {
00243 Persistent_Storage_Block** psbtemp = 0;
00244 {
00245 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00246 size_t queue_size = this->block_queue_.size();
00247 for (size_t idx = 0; !cached && (idx < queue_size); ++idx)
00248 {
00249
00250 size_t actual_block = (queue_size - idx) - 1;
00251 if (0 == this->block_queue_.get(psbtemp, actual_block))
00252 {
00253 cached = ((*psbtemp)->block_number() == psb->block_number());
00254 }
00255 }
00256
00257 if (cached && (0 != psbtemp))
00258 {
00259 ACE_OS::memcpy(psb->data(), (*psbtemp)->data(), this->block_size());
00260 }
00261 }
00262 if (!cached)
00263 {
00264 result = pstore_.read(psb->block_number(), psb->data());
00265 }
00266 }
00267 return result;
00268 }
00269
00270 bool
00271 Persistent_File_Allocator::write(Persistent_Storage_Block* psb)
00272 {
00273 bool result = this->thread_active_;
00274 if (result)
00275 {
00276 Persistent_Storage_Block* ourpsb = psb;
00277 if (!psb->get_allocator_owns())
00278 {
00279 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00280 ACE_TEXT ("(%P|%t) Copy PSB %d\n")
00281 , static_cast<int> (psb->block_number ())
00282 ));
00283 ACE_NEW_RETURN(ourpsb, Persistent_Storage_Block(*psb), false);
00284 ourpsb->set_allocator_owns(true);
00285 }
00286 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00287 if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00288 ACE_TEXT ("(%P|%t) Queueing PSB to write block %d\n")
00289 , static_cast<int> (psb->block_number ())
00290 ));
00291 result = (0 == this->block_queue_.enqueue_tail(ourpsb));
00292 this->wake_up_thread_.signal();
00293 }
00294 return result;
00295 }
00296
00297 void
00298 Persistent_File_Allocator::free_block(const size_t block_number)
00299 {
00300 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00301 ACE_ASSERT (this->free_blocks_.is_set (block_number));
00302 this->free_blocks_.set_bit(block_number, false);
00303 }
00304
00305 bool
00306 Persistent_File_Allocator::allocate_block(size_t& block_number)
00307 {
00308 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_, 0);
00309 block_number = this->free_blocks_.find_first_bit(false);
00310 return true;
00311 }
00312
00313 ACE_THR_FUNC_RETURN
00314 Persistent_File_Allocator::thr_func(void * arg)
00315 {
00316 Persistent_File_Allocator* pfa = static_cast<Persistent_File_Allocator*> (arg);
00317 pfa->run();
00318 return 0;
00319 }
00320
00321 size_t
00322 Persistent_File_Allocator::file_size () const
00323 {
00324 return this->pstore_.size ();
00325 }
00326
00327 void
00328 Persistent_File_Allocator::shutdown_thread()
00329 {
00330 if (this->thread_active_)
00331 {
00332 {
00333 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00334 this->terminate_thread_ = true;
00335 this->wake_up_thread_.signal();
00336 }
00337 this->thread_manager_.close();
00338 ACE_ASSERT (!this->terminate_thread_);
00339 ACE_ASSERT (!this->thread_active_);
00340 }
00341 }
00342
00343 void
00344 Persistent_File_Allocator::run()
00345 {
00346
00347
00348 bool do_more_work = true;
00349 while (do_more_work)
00350 {
00351 do_more_work = false;
00352 Persistent_Storage_Block * blk = 0;
00353 {
00354 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00355 while (this->block_queue_.is_empty() && !terminate_thread_)
00356 {
00357 this->wake_up_thread_.wait();
00358 }
00359
00360 Persistent_Storage_Block ** pblk = 0;
00361 if (0 == this->block_queue_.get(pblk))
00362 {
00363 do_more_work = true;
00364 blk = *pblk;
00365 }
00366 }
00367 if (0 != blk)
00368 {
00369 Persistent_Callback *callback = blk->get_callback();
00370 if (!blk->get_no_write())
00371 {
00372 pstore_.write(blk->block_number(), blk->data(), blk->get_sync());
00373 }
00374 {
00375 Persistent_Storage_Block * blk2 = 0;
00376 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00377 this->block_queue_.dequeue_head (blk2);
00378
00379
00380 ACE_ASSERT (blk2 == blk);
00381 }
00382
00383 if (blk->get_allocator_owns())
00384 {
00385 delete blk;
00386 blk = 0;
00387 }
00388 if (0 != callback)
00389 {
00390 callback->persist_complete();
00391 }
00392 }
00393 }
00394 this->terminate_thread_ = false;
00395 this->thread_active_ = false;
00396 }
00397
00398 }
00399
00400 TAO_END_VERSIONED_NAMESPACE_DECL