Persistent_File_Allocator.cpp

Go to the documentation of this file.
00001 // $Id: Persistent_File_Allocator.cpp 80166 2007-12-03 13:53:49Z sowayaa $
00002 
00003 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00004 
00005 #include "tao/debug.h"
00006 #include "ace/OS_NS_string.h"
00007 
00008 //#define DEBUG_LEVEL 9
00009 #ifndef DEBUG_LEVEL
00010 # define DEBUG_LEVEL TAO_debug_level
00011 #endif //DEBUG_LEVEL
00012 
00013 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace TAO_Notify
00016 {
00017 
00018 Persistent_Callback::~Persistent_Callback()
00019 {
00020 }
00021 
00022 Persistent_Storage_Block::Persistent_Storage_Block(const size_t block_number,
00023   const size_t block_size)
00024   : block_number_(block_number)
00025   , no_write_(false)
00026   , sync_(false)
00027   , block_size_(block_size)
00028   , callback_(0)
00029   , allocator_owns_(true)
00030 {
00031   ACE_NEW(this->data_, unsigned char[this->block_size_]);
00032   ACE_OS::memset(this->data_, 0, this->block_size_);
00033 
00034 }
00035 
00036 Persistent_Storage_Block::Persistent_Storage_Block(
00037   const Persistent_Storage_Block& psb)
00038   : block_number_(psb.block_number_)
00039   , no_write_(psb.no_write_)
00040   , sync_(psb.sync_)
00041   , block_size_(psb.block_size_)
00042   , callback_(psb.callback_)
00043   , allocator_owns_(psb.allocator_owns_)
00044 {
00045   ACE_NEW(this->data_, unsigned char[this->block_size_]);
00046   ACE_OS::memcpy(this->data_, psb.data(), this->block_size_);
00047 }
00048 
00049 Persistent_Storage_Block::~Persistent_Storage_Block()
00050 {
00051   delete [] this->data_;
00052   this->data_ = 0;
00053 }
00054 
00055 void
00056 Persistent_Storage_Block::set_no_write()
00057 {
00058   this->no_write_ = true;
00059   this->reassign_data(0, true);
00060 }
00061 
00062 bool
00063 Persistent_Storage_Block::get_no_write()
00064 {
00065   return this->no_write_;
00066 }
00067 
00068 void
00069 Persistent_Storage_Block::set_sync()
00070 {
00071   this->sync_ = true;
00072 }
00073 
00074 bool
00075 Persistent_Storage_Block::get_sync() const
00076 {
00077   return this->sync_;
00078 }
00079 
00080 size_t
00081 Persistent_Storage_Block::block_number() const
00082 {
00083   return this->block_number_;
00084 }
00085 
00086 unsigned char*
00087 Persistent_Storage_Block::data() const
00088 {
00089   return this->data_;
00090 }
00091 
00092 void
00093 Persistent_Storage_Block::reassign_data(unsigned char* newptr,
00094   bool delete_old)
00095 {
00096   if (delete_old)
00097   {
00098     delete [] this->data_;
00099   }
00100   this->data_ = newptr;
00101 }
00102 
00103 void
00104 Persistent_Storage_Block::set_callback(Persistent_Callback* callback)
00105 {
00106   this->callback_ = callback;
00107 }
00108 
00109 Persistent_Callback*
00110 Persistent_Storage_Block::get_callback() const
00111 {
00112   return this->callback_;
00113 }
00114 
00115 void
00116 Persistent_Storage_Block::set_allocator_owns(bool allocator_owns)
00117 {
00118   this->allocator_owns_ = allocator_owns;
00119 }
00120 
00121 bool
00122 Persistent_Storage_Block::get_allocator_owns() const
00123 {
00124   return this->allocator_owns_;
00125 }
00126 
00127 Persistent_File_Allocator::Persistent_File_Allocator()
00128   : pstore_()
00129   , terminate_thread_(false)
00130   , thread_active_(false)
00131   , wake_up_thread_(queue_lock_)
00132 {
00133 }
00134 
00135 Persistent_File_Allocator::~Persistent_File_Allocator()
00136 {
00137   this->shutdown_thread();
00138 }
00139 
00140 bool
00141 Persistent_File_Allocator::open (const ACE_TCHAR* filename,
00142   const size_t block_size)
00143 {
00144   bool file_opened = this->pstore_.open(filename, block_size);
00145   if (file_opened)
00146   {
00147     this->thread_active_ = true;
00148     this->thread_manager_.spawn(this->thr_func, this);
00149   }
00150   return file_opened;
00151 }
00152 
00153 void
00154 Persistent_File_Allocator::shutdown()
00155 {
00156   this->shutdown_thread();
00157 }
00158 
00159 Persistent_Storage_Block*
00160 Persistent_File_Allocator::allocate()
00161 {
00162   Persistent_Storage_Block* result = 0;
00163   size_t block_number = 0;
00164   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
00165   if (!this->allocate_block(block_number))
00166   {
00167     //@@todo: this should never happen
00168     // why not.  What if the disk is full?  Oh, I see we
00169     // allocate non-existent blocks.  FIX this
00170   }
00171   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00172     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate: %d\n"),
00173     static_cast<int> (block_number)
00174     ));
00175   result = this->allocate_at(block_number);
00176   return result;
00177 }
00178 
00179 Persistent_Storage_Block*
00180 Persistent_File_Allocator::allocate_at(size_t block_number)
00181 {
00182   Persistent_Storage_Block* result = 0;
00183   this->used(block_number);
00184   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00185     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::allocate at : %d\n"),
00186     static_cast<int> (block_number)
00187     ));
00188   ACE_NEW_RETURN(result, Persistent_Storage_Block(
00189     block_number,
00190     this->block_size()),
00191     0);
00192   return result;
00193 }
00194 
00195 Persistent_Storage_Block*
00196 Persistent_File_Allocator::allocate_nowrite()
00197 {
00198   Persistent_Storage_Block* result = 0;
00199   ACE_NEW_RETURN (result,
00200                   Persistent_Storage_Block (static_cast<size_t> (~0), 0),
00201                   0);
00202   result->set_no_write();
00203 
00204   return result;
00205 }
00206 
00207 void
00208 Persistent_File_Allocator::used(size_t block_number)
00209 {
00210   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00211   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00212     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::used: %d\n"),
00213     static_cast<int> (block_number)
00214     ));
00215   ACE_ASSERT (!this->free_blocks_.is_set (block_number));
00216   this->free_blocks_.set_bit(block_number, true);
00217 }
00218 
00219 void
00220 Persistent_File_Allocator::free(size_t block_number)
00221 {
00222   if (DEBUG_LEVEL > 0) ACE_DEBUG ((LM_DEBUG,
00223     ACE_TEXT ("(%P|%t) Persistent_File_Allocator::free: %d\n"),
00224     static_cast<int> (block_number)
00225     ));
00226   ACE_ASSERT (this->free_blocks_.is_set (block_number));
00227   this->free_block(block_number);
00228 }
00229 
00230 size_t
00231 Persistent_File_Allocator::block_size() const
00232 {
00233   return pstore_.block_size();
00234 }
00235 
00236 bool
00237 Persistent_File_Allocator::read(Persistent_Storage_Block* psb)
00238 {
00239   bool result = this->thread_active_;
00240   bool cached = false;
00241   if (result)
00242   {
00243     Persistent_Storage_Block** psbtemp = 0;
00244     {
00245       ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00246       size_t queue_size = this->block_queue_.size();
00247       for (size_t idx = 0; !cached && (idx < queue_size); ++idx)
00248       {
00249         // We want to start at the end of the queue and work backwards...
00250         size_t actual_block = (queue_size - idx) - 1;
00251         if ((0 == this->block_queue_.get(psbtemp, actual_block))
00252             && (psbtemp != 0))
00253         {
00254           cached = ((*psbtemp)->block_number() == psb->block_number());
00255         }
00256       }
00257       // this needs to be done in the guarded section
00258       if (cached && (0 != psbtemp))
00259       {
00260         ACE_OS::memcpy(psb->data(), (*psbtemp)->data(), this->block_size());
00261       }
00262     }
00263     if (!cached)
00264     {
00265       result = pstore_.read(psb->block_number(), psb->data());
00266     }
00267   }
00268   return result;
00269 }
00270 
00271 bool
00272 Persistent_File_Allocator::write(Persistent_Storage_Block* psb)
00273 {
00274   bool result = this->thread_active_;
00275   if (result)
00276   {
00277     Persistent_Storage_Block* ourpsb = psb;
00278     if (!psb->get_allocator_owns())
00279     {
00280       if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00281         ACE_TEXT ("(%P|%t) Copy PSB %d\n")
00282         , static_cast<int> (psb->block_number ())
00283         ));
00284       ACE_NEW_RETURN(ourpsb, Persistent_Storage_Block(*psb), false);
00285       ourpsb->set_allocator_owns(true);
00286     }
00287     ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_, false);
00288     if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG,
00289       ACE_TEXT ("(%P|%t) Queueing PSB to write block %d\n")
00290       , static_cast<int> (psb->block_number ())
00291       ));
00292     result = (0 == this->block_queue_.enqueue_tail(ourpsb));
00293     this->wake_up_thread_.signal();
00294   }
00295   return result;
00296 }
00297 
00298 void
00299 Persistent_File_Allocator::free_block(const size_t block_number)
00300 {
00301   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_);
00302   ACE_ASSERT (this->free_blocks_.is_set (block_number));
00303   this->free_blocks_.set_bit(block_number, false);
00304 }
00305 
00306 bool
00307 Persistent_File_Allocator::allocate_block(size_t& block_number)
00308 {
00309   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->free_blocks_lock_, 0);
00310   block_number = this->free_blocks_.find_first_bit(false);
00311   return true;
00312 }
00313 
00314 ACE_THR_FUNC_RETURN
00315 Persistent_File_Allocator::thr_func(void * arg)
00316 {
00317   Persistent_File_Allocator* pfa = static_cast<Persistent_File_Allocator*> (arg);
00318   pfa->run();
00319   return 0;
00320 }
00321 
00322 ACE_OFF_T
00323 Persistent_File_Allocator::file_size () const
00324 {
00325   return this->pstore_.size ();
00326 }
00327 
00328 void
00329 Persistent_File_Allocator::shutdown_thread()
00330 {
00331   if (this->thread_active_)
00332   {
00333     {
00334       ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00335       this->terminate_thread_ = true;
00336       this->wake_up_thread_.signal();
00337     }
00338     this->thread_manager_.close();
00339     ACE_ASSERT (!this->terminate_thread_);
00340     ACE_ASSERT (!this->thread_active_);
00341   }
00342 }
00343 
00344 void
00345 Persistent_File_Allocator::run()
00346 {
00347   // We need this because we could be working on writing data
00348   // when a call to terminate comes in!
00349   bool do_more_work = true;
00350   while (do_more_work)
00351   {
00352     do_more_work = false;
00353     Persistent_Storage_Block * blk = 0;
00354     {
00355       ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00356       while (this->block_queue_.is_empty() && !terminate_thread_)
00357       {
00358         this->wake_up_thread_.wait();
00359       }
00360       // Awkward interface to peek at head of unbounded queue
00361       Persistent_Storage_Block ** pblk = 0;
00362       if (0 == this->block_queue_.get(pblk))
00363       {
00364         do_more_work = true;
00365         blk = *pblk;
00366       }
00367     }
00368     if (0 != blk)
00369     {
00370       Persistent_Callback *callback = blk->get_callback();
00371       if (!blk->get_no_write())
00372       {
00373         pstore_.write(blk->block_number(), blk->data(), blk->get_sync());
00374       }
00375       {
00376         Persistent_Storage_Block * blk2 = 0;
00377         ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->queue_lock_);
00378         this->block_queue_.dequeue_head (blk2);
00379         // if this triggers, someone pushed onto the head of the queue
00380         // or removed the head from the queue without telling ME.
00381         ACE_ASSERT (blk2 == blk);
00382       }
00383       // If we own the block, then delete it.
00384       if (blk->get_allocator_owns())
00385       {
00386         delete blk;
00387         blk = 0;
00388       }
00389       if (0 != callback)
00390       {
00391         callback->persist_complete();
00392       }
00393     }
00394   }
00395   this->terminate_thread_ = false;
00396   this->thread_active_ = false;
00397 }
00398 
00399 } /* namespace TAO_Notify */
00400 
00401 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7