Persistent_File_Allocator.cpp

Go to the documentation of this file.
00001 // Persistent_File_Allocator.cpp,v 1.11 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00004 
00005 #include "tao/debug.h"
00006 #include "ace/OS_NS_string.h"
00007 
00008 //#define DEBUG_LEVEL 9
00009 #ifndef DEBUG_LEVEL
00010 # define DEBUG_LEVEL TAO_debug_level
00011 #endif //DEBUG_LEVEL
00012 
00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace TAO_Notify
00016 {
00017 
00018 Persistent_Callback::~Persistent_Callback()
00019 {
00020 }
00021 
00022 Persistent_Storage_Block::Persistent_Storage_Block(const size_t block_number,
00023   const size_t block_size)
00024   : block_number_(block_number)
00025   , no_write_(false)
00026   , sync_(false)
00027   , block_size_(block_size)
00028   , callback_(0)
00029   , allocator_owns_(true)
00030 {
00031   ACE_NEW(this->data_, unsigned char[this->block_size_]);
00032   ACE_OS::memset(this->data_, 0, this->block_size_);
00033 
00034 }
00035 
00036 Persistent_Storage_Block::Persistent_Storage_Block(
00037   const Persistent_Storage_Block& psb)
00038   : block_number_(psb.block_number_)
00039   , no_write_(psb.no_write_)
00040   , sync_(psb.sync_)
00041   , block_size_(psb.block_size_)
00042   , callback_(psb.callback_)
00043   , allocator_owns_(psb.allocator_owns_)
00044 {
00045   ACE_NEW(this->data_, unsigned char[this->block_size_]);
00046   ACE_OS::memcpy(this->data_, psb.data(), this->block_size_);
00047 }
00048 
00049 Persistent_Storage_Block::~Persistent_Storage_Block()
00050 {
00051   delete [] this->data_;
00052   this->data_ = 0;
00053 }
00054 
00055 void
00056 Persistent_Storage_Block::set_no_write()
00057 {
00058   this->no_write_ = true;
00059   this->reassign_data(0, true);
00060 }
00061 
00062 bool
00063 Persistent_Storage_Block::get_no_write()
00064 {
00065   return this->no_write_;
00066 }
00067 
00068 void
00069 Persistent_Storage_Block::set_sync()
00070 {
00071   this->sync_ = true;
00072 }
00073 
00074 bool
00075 Persistent_Storage_Block::get_sync() const
00076 {
00077   return this->sync_;
00078 }
00079 
00080 size_t
00081 Persistent_Storage_Block::block_number() const
00082 {
00083   return this->block_number_;
00084 }
00085 
00086 unsigned char*
00087 Persistent_Storage_Block::data() const
00088 {
00089   return this->data_;
00090 }
00091 
00092 void
00093 Persistent_Storage_Block::reassign_data(unsigned char* newptr,
00094   bool delete_old)
00095 {
00096   if (delete_old)
00097   {
00098     delete [] this->data_;
00099   }
00100   this->data_ = newptr;
00101 }
00102 
00103 void
00104 Persistent_Storage_Block::set_callback(Persistent_Callback* callback)
00105 {
00106   this->callback_ = callback;
00107 }
00108 
00109 Persistent_Callback*
00110 Persistent_Storage_Block::get_callback() const
00111 {
00112   return this->callback_;
00113 }
00114 
00115 void
00116 Persistent_Storage_Block::set_allocator_owns(bool allocator_owns)
00117 {
00118   this->allocator_owns_ = allocator_owns;
00119 }
00120 
00121 bool
00122 Persistent_Storage_Block::get_allocator_owns() const
00123 {
00124   return this->allocator_owns_;
00125 }
00126 
00127 Persistent_File_Allocator::Persistent_File_Allocator()
00128   : pstore_()
00129   , terminate_thread_(false)
00130   , thread_active_(false)
00131   , wake_up_thread_(queue_lock_)
00132 {
00133 }
00134 
00135 Persistent_File_Allocator::~Persistent_File_Allocator()
00136 {
00137   this->shutdown_thread();
00138 }
00139 
00140 bool
00141 Persistent_File_Allocator::open (const ACE_TCHAR* filename,
00142   const size_t block_size)
00143 {
00144   bool file_opened = this->pstore_.open(filename, block_size);
00145   if (file_opened)
00146   {
00147     this->thread_active_ = true;
00148     this->thread_manager_.spawn(this->thr_func, this);
00149   }
00150   return file_opened;
00151 }
00152 
00153 void
00154 Persistent_File_Allocator::shutdown()
00155 {
00156   this->shutdown_thread();
00157 }
00158 
00159 Persistent_Storage_Block*
00160 Persistent_File_Allocator::allocate()
00161 {
00162   Persistent_Storage_Block* result = 0;
00163   size_t block_number = 0;
00164   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00165   if (!this->allocate_block(block_number))
00166   {
00167     //@@todo: this should never happen
00168     // why not.  What if the disk is full?  Oh, I see we
00169     // allocate non-existent blocks.  FIX this
00170   }
00171   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00172     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate: %d\n"),
00173     static_cast<int> (block_number)
00174     ));
00175   result = this->allocate_at(block_number);
00176   return result;
00177 }
00178 
00179 Persistent_Storage_Block*
00180 Persistent_File_Allocator::allocate_at(size_t block_number)
00181 {
00182   Persistent_Storage_Block* result = 0;
00183   this->used(block_number);
00184   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00185     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate at : %d\n"),
00186     static_cast<int> (block_number)
00187     ));
00188   ACE_NEW_RETURN(result, Persistent_Storage_Block(
00189     block_number,
00190     this->block_size()),
00191     0);
00192   return result;
00193 }
00194 
00195 Persistent_Storage_Block*
00196 Persistent_File_Allocator::allocate_nowrite()
00197 {
00198   Persistent_Storage_Block* result = 0;
00199   ACE_NEW_RETURN (result,
00200                   Persistent_Storage_Block (static_cast<size_t> (~0), 0),
00201                   0);
00202   result->set_no_write();
00203 
00204   return result;
00205 }
00206 
00207 void
00208 Persistent_File_Allocator::used(size_t block_number)
00209 {
00210   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00211   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00212     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::used: %d\n"),
00213     static_cast<int> (block_number)
00214     ));
00215   ACE_ASSERT (!this->free_blocks_.is_set (block_number));
00216   this->free_blocks_.set_bit(block_number, true);
00217 }
00218 
00219 void
00220 Persistent_File_Allocator::free(size_t block_number)
00221 {
00222   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00223     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::free: %d\n"),
00224     static_cast<int> (block_number)
00225     ));
00226   ACE_ASSERT (this->free_blocks_.is_set (block_number));
00227   this->free_block(block_number);
00228 }
00229 
00230 size_t
00231 Persistent_File_Allocator::block_size() const
00232 {
00233   return pstore_.block_size();
00234 }
00235 
00236 bool
00237 Persistent_File_Allocator::read(Persistent_Storage_Block* psb)
00238 {
00239   bool result = this->thread_active_;
00240   bool cached = false;
00241   if (result)
00242   {
00243     Persistent_Storage_Block** psbtemp = 0;
00244     {
00245       ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00246       size_t queue_size = this->block_queue_.size();
00247       for (size_t idx = 0; !cached && (idx < queue_size); ++idx)
00248       {
00249         // We want to start at the end of the queue and work backwards...
00250         size_t actual_block = (queue_size - idx) - 1;
00251         if (0 == this->block_queue_.get(psbtemp, actual_block))
00252         {
00253           cached = ((*psbtemp)->block_number() == psb->block_number());
00254         }
00255       }
00256       // this needs to be done in the guarded section
00257       if (cached && (0 != psbtemp))
00258       {
00259         ACE_OS::memcpy(psb->data(), (*psbtemp)->data(), this->block_size());
00260       }
00261     }
00262     if (!cached)
00263     {
00264       result = pstore_.read(psb->block_number(), psb->data());
00265     }
00266   }
00267   return result;
00268 }
00269 
00270 bool
00271 Persistent_File_Allocator::write(Persistent_Storage_Block* psb)
00272 {
00273   bool result = this->thread_active_;
00274   if (result)
00275   {
00276     Persistent_Storage_Block* ourpsb = psb;
00277     if (!psb->get_allocator_owns())
00278     {
00279       if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00280         ACE_TEXT ("(%P|%t) Copy PSB %d\n")
00281         , static_cast<int> (psb->block_number ())
00282         ));
00283       ACE_NEW_RETURN(ourpsb, Persistent_Storage_Block(*psb), false);
00284       ourpsb->set_allocator_owns(true);
00285     }
00286     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00287     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00288       ACE_TEXT ("(%P|%t) Queueing PSB to write block %d\n")
00289       , static_cast<int> (psb->block_number ())
00290       ));
00291     result = (0 == this->block_queue_.enqueue_tail(ourpsb));
00292     this->wake_up_thread_.signal();
00293   }
00294   return result;
00295 }
00296 
00297 void
00298 Persistent_File_Allocator::free_block(const size_t block_number)
00299 {
00300   ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00301   ACE_ASSERT (this->free_blocks_.is_set (block_number));
00302   this->free_blocks_.set_bit(block_number, false);
00303 }
00304 
00305 bool
00306 Persistent_File_Allocator::allocate_block(size_t& block_number)
00307 {
00308   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_, 0);
00309   block_number = this->free_blocks_.find_first_bit(false);
00310   return true;
00311 }
00312 
00313 ACE_THR_FUNC_RETURN
00314 Persistent_File_Allocator::thr_func(void * arg)
00315 {
00316   Persistent_File_Allocator* pfa = static_cast<Persistent_File_Allocator*> (arg);
00317   pfa->run();
00318   return 0;
00319 }
00320 
00321 size_t
00322 Persistent_File_Allocator::file_size () const
00323 {
00324   return this->pstore_.size ();
00325 }
00326 
00327 void
00328 Persistent_File_Allocator::shutdown_thread()
00329 {
00330   if (this->thread_active_)
00331   {
00332     {
00333       ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00334       this->terminate_thread_ = true;
00335       this->wake_up_thread_.signal();
00336     }
00337     this->thread_manager_.close();
00338     ACE_ASSERT (!this->terminate_thread_);
00339     ACE_ASSERT (!this->thread_active_);
00340   }
00341 }
00342 
00343 void
00344 Persistent_File_Allocator::run()
00345 {
00346   // We need this because we could be working on writing data
00347   // when a call to terminate comes in!
00348   bool do_more_work = true;
00349   while (do_more_work)
00350   {
00351     do_more_work = false;
00352     Persistent_Storage_Block * blk = 0;
00353     {
00354       ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00355       while (this->block_queue_.is_empty() && !terminate_thread_)
00356       {
00357         this->wake_up_thread_.wait();
00358       }
00359       // Awkward interface to peek at head of unbounded queue
00360       Persistent_Storage_Block ** pblk = 0;
00361       if (0 == this->block_queue_.get(pblk))
00362       {
00363         do_more_work = true;
00364         blk = *pblk;
00365       }
00366     }
00367     if (0 != blk)
00368     {
00369       Persistent_Callback *callback = blk->get_callback();
00370       if (!blk->get_no_write())
00371       {
00372         pstore_.write(blk->block_number(), blk->data(), blk->get_sync());
00373       }
00374       {
00375         Persistent_Storage_Block * blk2 = 0;
00376         ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00377         this->block_queue_.dequeue_head (blk2);
00378         // if this triggers, someone pushed onto the head of the queue
00379         // or removed the head from the queue without telling ME.
00380         ACE_ASSERT (blk2 == blk);
00381       }
00382       // If we own the block, then delete it.
00383       if (blk->get_allocator_owns())
00384       {
00385         delete blk;
00386         blk = 0;
00387       }
00388       if (0 != callback)
00389       {
00390         callback->persist_complete();
00391       }
00392     }
00393   }
00394   this->terminate_thread_ = false;
00395   this->thread_active_ = false;
00396 }
00397 
00398 } /* namespace TAO_Notify */
00399 
00400 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:13 2006 for TAO_CosNotification by doxygen 1.3.6