Routing_Slip_Persistence_Manager.cpp

Go to the documentation of this file.
00001 // $Id: Routing_Slip_Persistence_Manager.cpp 80166 2007-12-03 13:53:49Z sowayaa $
00002 
00003 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00004 #include "orbsvcs/Notify/Standard_Event_Persistence.h"
00005 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00006 #include "ace/Truncate.h"
00007 
00008 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00009 
00010 namespace TAO_Notify
00011 {
00012 
00013 Routing_Slip_Persistence_Manager::Routing_Slip_Persistence_Manager(
00014   Standard_Event_Persistence_Factory* factory)
00015   : removed_(false)
00016   , serial_number_(0)
00017   , allocator_(factory->allocator())
00018   , factory_(factory)
00019   , first_event_block_(0)
00020   , first_routing_slip_block_(0)
00021   , callback_(0)
00022   , event_mb_ (0)
00023   , routing_slip_mb_(0)
00024 {
00025   this->prev_manager_ = this;
00026   this->next_manager_ = this;
00027 }
00028 
00029 Routing_Slip_Persistence_Manager::~Routing_Slip_Persistence_Manager()
00030 {
00031   ACE_ASSERT(this->prev_manager_ == this);
00032   ACE_ASSERT(this->next_manager_ == this);
00033   delete this->first_event_block_;
00034   this->first_event_block_ = 0;
00035   delete this->first_routing_slip_block_;
00036   this->first_routing_slip_block_ = 0;
00037   delete this->event_mb_;
00038   this->event_mb_ = 0;
00039   delete this->routing_slip_mb_;
00040   this->routing_slip_mb_ = 0;
00041 }
00042 
00043 void
00044 Routing_Slip_Persistence_Manager::set_callback(Persistent_Callback* callback)
00045 {
00046   ACE_GUARD(TAO_SYNCH_MUTEX, ace_mon, this->lock_);
00047   this->callback_ = callback;
00048 }
00049 
00050 bool
00051 Routing_Slip_Persistence_Manager::store_root()
00052 {
00053   bool result = false;
00054 
00055   this->factory_->get_preallocated_pointer (
00056     this->routing_slip_header_.next_serial_number,
00057     this->routing_slip_header_.next_routing_slip_block);
00058 
00059   // we should already have a psb, but JIC
00060   ACE_ASSERT(this->first_routing_slip_block_ != 0);
00061   ACE_ASSERT(this->first_routing_slip_block_->block_number() ==
00062     ROUTING_SLIP_ROOT_BLOCK_NUMBER);
00063 
00064   // Don't take any chances.  Use hard-wired root serial number.
00065   this->routing_slip_header_.serial_number = ROUTING_SLIP_ROOT_SERIAL_NUMBER;
00066 
00067   // This will eventually break after something like 58000 years.
00068   // At such time we should change this to !=.
00069   ACE_ASSERT(this->routing_slip_header_.next_serial_number >
00070     ROUTING_SLIP_ROOT_SERIAL_NUMBER);
00071 
00072   ACE_Message_Block versioninfo(2);
00073   versioninfo.wr_ptr()[0] = 1; // Major version number
00074   versioninfo.wr_ptr()[1] = 0; // Minor version number
00075   versioninfo.wr_ptr(2);
00076   ACE_GUARD_RETURN(TAO_SYNCH_MUTEX, ace_mon, this->lock_, result);
00077   result = this->build_chain(this->first_routing_slip_block_,
00078     this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00079     versioninfo);
00080   if (result)
00081   {
00082    this->routing_slip_header_.put_header(*this->first_routing_slip_block_);
00083    this->allocator_->write(this->first_routing_slip_block_);
00084   }
00085   return result;
00086 }
00087 
00088 bool
00089 Routing_Slip_Persistence_Manager::reload(
00090   ACE_Message_Block*& event,
00091   ACE_Message_Block*& routing_slip)
00092 {
00093   bool result = false;
00094   if (this->event_mb_ != 0 && this->routing_slip_mb_ != 0)
00095   {
00096     event = this->event_mb_;
00097     this->event_mb_ = 0;
00098     routing_slip = this->routing_slip_mb_;
00099     this->routing_slip_mb_ = 0;
00100     result = true;
00101   }
00102   else
00103   {
00104     event = 0;
00105     routing_slip = 0;
00106   }
00107   return result;
00108 }
00109 
00110 bool
00111 Routing_Slip_Persistence_Manager::load(
00112   Block_Number block_number,
00113   Block_Serial_Number expected_serial_number)
00114 {
00115   /**
00116    * NOTE: There is no need to worry about guarding anything.  We assume
00117    *       that there will be one and only one thread doing the entire
00118    *       reload process.
00119    */
00120   bool result = false;
00121   size_t block_size = this->allocator_->block_size();
00122   this->first_routing_slip_block_ =
00123     this->allocator_->allocate_at(block_number);
00124   this->first_routing_slip_block_->set_allocator_owns(false);
00125   this->first_routing_slip_block_->set_sync();
00126 
00127   this->serial_number_ = expected_serial_number;
00128 
00129   ACE_NEW_NORETURN(this->routing_slip_mb_, ACE_Message_Block(block_size));
00130   ACE_NEW_NORETURN(this->event_mb_, ACE_Message_Block(block_size));
00131   if (this->event_mb_ != 0 && this->routing_slip_mb_ != 0)
00132   {
00133     if (this->reload_chain(
00134           this->first_routing_slip_block_,
00135           this->routing_slip_header_,
00136           this->allocated_routing_slip_blocks_,
00137           this->routing_slip_mb_,
00138           expected_serial_number))
00139     {
00140       if (this->routing_slip_header_.event_block != 0)
00141       {
00142         this->first_event_block_ = this->allocator_->allocate_at(
00143           this->routing_slip_header_.event_block);
00144         result = this->reload_chain(
00145           this->first_event_block_,
00146           this->event_header_,
00147           this->allocated_event_blocks_,
00148           this->event_mb_,
00149           0);
00150       }
00151       else if (block_number == ROUTING_SLIP_ROOT_BLOCK_NUMBER)
00152       {
00153         // only the root can lack event
00154         result = true;
00155       }
00156       else
00157       {
00158         ACE_ERROR((LM_ERROR,
00159           ACE_TEXT(
00160             "(%P|%t) Reloaded Persistent Event is missing event.\n")
00161           ));
00162       }
00163     }
00164   }
00165   if (! result)
00166   {
00167     delete this->routing_slip_mb_;
00168     this->routing_slip_mb_ = 0;
00169     delete this->event_mb_;
00170     this->event_mb_ = 0;
00171   }
00172   return result;
00173 }
00174 
00175 Routing_Slip_Persistence_Manager *
00176 Routing_Slip_Persistence_Manager::load_next ()
00177 {
00178   Routing_Slip_Persistence_Manager * result;
00179   ACE_NEW_RETURN(result, Routing_Slip_Persistence_Manager (this->factory_), 0);
00180 
00181   if (result->load(this->routing_slip_header_.next_routing_slip_block,
00182     this->routing_slip_header_.next_serial_number))
00183   {
00184     result->dllist_push_back();
00185   }
00186   else
00187   {
00188     // steal the psb for use as the next psb.
00189     // delete the rspm.  We'll create another one later.
00190     Persistent_Storage_Block * next_psb = result->first_routing_slip_block_;
00191     result->first_routing_slip_block_ = 0;
00192 //    next_psb->set_allocator_owns(true);
00193     this->factory_->done_reloading (
00194       next_psb,
00195       result->serial_number_);
00196     delete result;
00197     result = 0;
00198   }
00199   return result;
00200 }
00201 
00202 bool
00203 Routing_Slip_Persistence_Manager::store(const ACE_Message_Block& event,
00204   const ACE_Message_Block& routing_slip)
00205 {
00206   bool result = false;
00207   ACE_GUARD_RETURN(TAO_SYNCH_MUTEX, ace_mon, this->lock_, result);
00208   if (!this->removed_)
00209   {
00210     result = store_i(event, routing_slip);
00211   }
00212   return result;
00213 }
00214 
00215 bool
00216 Routing_Slip_Persistence_Manager::update(const ACE_Message_Block& routing_slip)
00217 {
00218   bool result = false;
00219   ACE_GUARD_RETURN(TAO_SYNCH_MUTEX, ace_mon, this->lock_, result);
00220   // If we have not gotten the event yet or we have no allocator, fail
00221   if (!this->removed_)
00222   {
00223     if (this->persisted())
00224     {
00225       result = update_i(routing_slip);
00226     }
00227   }
00228   return result;
00229 }
00230 
00231 bool
00232 Routing_Slip_Persistence_Manager::remove()
00233 {
00234   bool result = false;
00235   ACE_GUARD_RETURN(TAO_SYNCH_MUTEX, ace_mon, this->lock_, result);
00236   // Assert that this is in the dllist
00237   ACE_ASSERT(this->prev_manager_ != this);
00238   ACE_ASSERT(this->persisted());
00239   Routing_Slip_Persistence_Manager* prev = this->prev_manager_;
00240   // Once our previous manager removes us, we can deallocate in any order
00241   this->factory_->lock.acquire();
00242   this->remove_from_dllist();
00243   result = prev->update_next_manager(this);
00244   this->factory_->lock.release();
00245   size_t block_number = 0;
00246   if (this->first_routing_slip_block_ != 0)
00247   {
00248     this->allocator_->free(this->first_routing_slip_block_->block_number());
00249     delete this->first_routing_slip_block_;
00250     this->first_routing_slip_block_ = 0;
00251   }
00252   if (this->first_event_block_ != 0)
00253   {
00254     this->allocator_->free(this->first_event_block_->block_number());
00255     delete this->first_event_block_;
00256     this->first_event_block_ = 0;
00257   }
00258   while (this->allocated_routing_slip_blocks_.pop(block_number) == 0)
00259   {
00260     this->allocator_->free(block_number);
00261   }
00262   while (this->allocated_event_blocks_.pop(block_number) == 0)
00263   {
00264     this->allocator_->free(block_number);
00265   }
00266   this->removed_ = true;
00267   Persistent_Storage_Block* callbackblock =
00268     this->allocator_->allocate_nowrite();
00269   callbackblock->set_callback(this->callback_);
00270   result &= this->allocator_->write(callbackblock);
00271   return result;
00272 }
00273 
00274 Routing_Slip_Persistence_Manager::Block_Header::Block_Header(Header_Type type)
00275   : serial_number (0)
00276   , next_overflow(0)
00277   , header_type (static_cast<Block_Type> (type))
00278   , data_size(0)
00279 {
00280 }
00281 Routing_Slip_Persistence_Manager::Block_Header::~Block_Header (void)
00282 {
00283 }
00284 
00285 size_t
00286 Routing_Slip_Persistence_Manager::Block_Header::extract_header(
00287   Persistent_Storage_Block& psb, size_t offset)
00288 {
00289   size_t pos = offset;
00290   unsigned char* data = psb.data();
00291 
00292   serial_number = data[pos++];
00293   serial_number = (serial_number << 8) + data[pos++];
00294   serial_number = (serial_number << 8) + data[pos++];
00295   serial_number = (serial_number << 8) + data[pos++];
00296   serial_number = (serial_number << 8) + data[pos++];
00297   serial_number = (serial_number << 8) + data[pos++];
00298   serial_number = (serial_number << 8) + data[pos++];
00299   serial_number = (serial_number << 8) + data[pos++];
00300 
00301   next_overflow = data[pos++];
00302   next_overflow = (next_overflow << 8) + data[pos++];
00303   next_overflow = (next_overflow << 8) + data[pos++];
00304   next_overflow = (next_overflow << 8) + data[pos++];
00305 
00306   header_type = data[pos++];
00307   header_type = (data_size << 8) + data[pos++];
00308 
00309   data_size = data[pos++];
00310   data_size = (data_size << 8) + data[pos++];
00311   return pos;
00312 }
00313 
00314 size_t
00315 Routing_Slip_Persistence_Manager::Block_Header::put_header(
00316   Persistent_Storage_Block& psb, size_t offset)
00317 {
00318   // Assume that our psb can hold our small amount of data...
00319   size_t pos = offset;
00320   unsigned char* data = psb.data();
00321   // Store serial_number
00322   data[pos++] = static_cast<unsigned char> ((serial_number >> 56) & 0xff);
00323   data[pos++] = static_cast<unsigned char> ((serial_number >> 48) & 0xff);
00324   data[pos++] = static_cast<unsigned char> ((serial_number >> 40) & 0xff);
00325   data[pos++] = static_cast<unsigned char> ((serial_number >> 32) & 0xff);
00326   data[pos++] = static_cast<unsigned char> ((serial_number >> 24) & 0xff);
00327   data[pos++] = static_cast<unsigned char> ((serial_number >> 16) & 0xff);
00328   data[pos++] = static_cast<unsigned char> ((serial_number >> 8) & 0xff);
00329   data[pos++] = static_cast<unsigned char> ((serial_number >> 0) & 0xff);
00330   // Store next_overflow
00331   data[pos++] = static_cast<unsigned char> (next_overflow >> 24);
00332   data[pos++] = static_cast<unsigned char> ((next_overflow >> 16) & 0xff);
00333   data[pos++] = static_cast<unsigned char> ((next_overflow >> 8) & 0xff);
00334   data[pos++] = static_cast<unsigned char> (next_overflow & 0xff);
00335   // Store header_type
00336   data[pos++] = static_cast<unsigned char> ((header_type >> 8) & 0xff);
00337   data[pos++] = static_cast<unsigned char> (header_type & 0xff);
00338   // Store data_size
00339   data[pos++] = static_cast<unsigned char> ((data_size >> 8) & 0xff);
00340   data[pos++] = static_cast<unsigned char> (data_size & 0xff);
00341 
00342   return pos;
00343 }
00344 
00345 Routing_Slip_Persistence_Manager::Routing_Slip_Header::Routing_Slip_Header()
00346   : Block_Header (BT_Event)
00347   , next_routing_slip_block(0)
00348   , next_serial_number(0)
00349   , event_block(0)
00350 {
00351 }
00352 
00353 size_t
00354 Routing_Slip_Persistence_Manager::Routing_Slip_Header::extract_header(
00355   Persistent_Storage_Block& psb, size_t offset)
00356 {
00357   size_t pos = offset;
00358   pos = this->Block_Header::extract_header(psb, pos);
00359   unsigned char* data = psb.data();
00360   next_routing_slip_block = data[pos++];
00361   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00362   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00363   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00364   next_serial_number = data[pos++];
00365   next_serial_number = (next_serial_number << 8) + data[pos++];
00366   next_serial_number = (next_serial_number << 8) + data[pos++];
00367   next_serial_number = (next_serial_number << 8) + data[pos++];
00368   next_serial_number = (next_serial_number << 8) + data[pos++];
00369   next_serial_number = (next_serial_number << 8) + data[pos++];
00370   next_serial_number = (next_serial_number << 8) + data[pos++];
00371   next_serial_number = (next_serial_number << 8) + data[pos++];
00372   event_block = data[pos++];
00373   event_block = (event_block << 8) + data[pos++];
00374   event_block = (event_block << 8) + data[pos++];
00375   event_block = (event_block << 8) + data[pos++];
00376   return pos;
00377 }
00378 
00379 size_t
00380 Routing_Slip_Persistence_Manager::Routing_Slip_Header::put_header(
00381   Persistent_Storage_Block& psb, size_t offset)
00382 {
00383   // Assume that our psb can hold our small amount of data...
00384   size_t pos = offset;
00385   // Store serial number, next_overflow and data_size
00386   pos = this->Block_Header::put_header(psb, pos);
00387 
00388   unsigned char* data = psb.data();
00389   // Store next_routing_slip_block
00390   data[pos++] = static_cast<unsigned char> (next_routing_slip_block >> 24);
00391   data[pos++] = static_cast<unsigned char> ((next_routing_slip_block >> 16) & 0xff);
00392   data[pos++] = static_cast<unsigned char> ((next_routing_slip_block >> 8) & 0xff);
00393   data[pos++] = static_cast<unsigned char> (next_routing_slip_block & 0xff);
00394   // Store serial_number
00395   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 56) & 0xff);
00396   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 48) & 0xff);
00397   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 40) & 0xff);
00398   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 32) & 0xff);
00399   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 24) & 0xff);
00400   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 16) & 0xff);
00401   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 8) & 0xff);
00402   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 0) & 0xff);
00403   // Store event_block
00404   data[pos++] = static_cast<unsigned char> (event_block >> 24);
00405   data[pos++] = static_cast<unsigned char> ((event_block >> 16) & 0xff);
00406   data[pos++] = static_cast<unsigned char> ((event_block >> 8) & 0xff);
00407   data[pos++] = static_cast<unsigned char> (event_block & 0xff);
00408   return pos;
00409 }
00410 
00411 Routing_Slip_Persistence_Manager::Overflow_Header::Overflow_Header ()
00412   : Block_Header (BT_Overflow)
00413 {
00414 }
00415 
00416 Routing_Slip_Persistence_Manager::Event_Header::Event_Header ()
00417   : Block_Header (BT_Routing_Slip)
00418 {
00419 }
00420 
00421 bool
00422 Routing_Slip_Persistence_Manager::store_i(const ACE_Message_Block& event,
00423   const ACE_Message_Block& routing_slip)
00424 {
00425   bool result = false;
00426 
00427   bool initially_persisted = this->persisted();
00428   if (!initially_persisted)
00429   {
00430     this->factory_->lock.acquire();
00431     this->factory_->preallocate_next_record(this->serial_number_,
00432       this->first_routing_slip_block_,
00433       this->routing_slip_header_.next_serial_number,
00434       this->routing_slip_header_.next_routing_slip_block);
00435     this->routing_slip_header_.serial_number = this->serial_number_;
00436   }
00437 
00438   result = this->build_chain(this->first_routing_slip_block_,
00439     this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00440     routing_slip);
00441 
00442   if (result)
00443   {
00444     // No need for a callback here since we do our own below
00445     result &= this->store_event(event);
00446     // If we have an event block allocated, update our header
00447     if (this->first_event_block_ != 0)
00448     {
00449       this->routing_slip_header_.event_block =
00450         ACE_Utils::truncate_cast<Block_Number> (this->first_event_block_->block_number());
00451     }
00452     else
00453     {
00454       ACE_ERROR((LM_ERROR,
00455         ACE_TEXT(
00456           "(%P|%t) No Event is being stored with this routing slip.\n")
00457         ));
00458     }
00459     // Always write our first block out.
00460     this->dllist_push_back();
00461     result &= (this->write_first_routing_slip_block() != 0);
00462     // because the first rs blocks everywhere have been given sync, we are
00463     // guaranteed that they will be totally written by the time we get to this
00464     // empty callback-only block.
00465     Persistent_Storage_Block* callbackblock =
00466       this->allocator_->allocate_nowrite();
00467     callbackblock->set_callback(this->callback_);
00468     result &= this->allocator_->write(callbackblock);
00469   }
00470   if (!initially_persisted)
00471   {
00472     this->factory_->lock.release();
00473   }
00474   return result;
00475 }
00476 
00477 bool
00478 Routing_Slip_Persistence_Manager::update_i(
00479   const ACE_Message_Block& routing_slip)
00480 {
00481   bool result = true;
00482   size_t routing_slip_size = routing_slip.total_length();
00483   if (routing_slip_size != 0)
00484   {
00485     result = this->build_chain(this->first_routing_slip_block_,
00486       this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00487       routing_slip);
00488 
00489     result &= this->allocator_->write(this->first_routing_slip_block_);
00490   }
00491   Persistent_Storage_Block* callbackblock =
00492     this->allocator_->allocate_nowrite();
00493   callbackblock->set_callback(this->callback_);
00494   result &= this->allocator_->write(callbackblock);
00495   return result;
00496 }
00497 
00498 bool
00499 Routing_Slip_Persistence_Manager::store_event(
00500   const ACE_Message_Block& event)
00501 {
00502   bool result = true;
00503   size_t event_size = event.total_length();
00504   if (event_size != 0)
00505   {
00506     if (this->first_event_block_ == 0)
00507     {
00508       this->first_event_block_ = this->allocator_->allocate();
00509       this->first_event_block_->set_allocator_owns(false);
00510     }
00511 
00512     result = this->build_chain(this->first_event_block_,
00513       this->event_header_, this->allocated_event_blocks_,
00514       event);
00515 
00516     result &= this->allocator_->write(this->first_event_block_);
00517   }
00518   return result;
00519 }
00520 
00521 size_t
00522 Routing_Slip_Persistence_Manager::fill_block(Persistent_Storage_Block& psb,
00523   size_t offset_into_block, const ACE_Message_Block* data,
00524   size_t offset_into_msg)
00525 {
00526   unsigned char* ptr = (unsigned char*)data->rd_ptr();
00527   return this->fill_block(psb, offset_into_block, ptr + offset_into_msg,
00528   data->length() - offset_into_msg);
00529 }
00530 
00531 size_t
00532 Routing_Slip_Persistence_Manager::fill_block(Persistent_Storage_Block& psb,
00533   size_t offset_into_block, unsigned char* data, size_t data_size)
00534 {
00535   size_t result = 0;
00536   if (data_size > 0)
00537   {
00538     const size_t max_size = this->allocator_->block_size() - offset_into_block;
00539     size_t size_to_copy = data_size;
00540     if (size_to_copy > max_size)
00541     {
00542       size_to_copy = max_size;
00543       result = data_size - size_to_copy;
00544     }
00545     else
00546     {
00547       result = 0;
00548     }
00549     ACE_OS::memcpy(psb.data() + offset_into_block, data, size_to_copy);
00550   }
00551   return result;
00552 }
00553 
00554 bool
00555 Routing_Slip_Persistence_Manager::build_chain(
00556     Persistent_Storage_Block* first_block, Block_Header& first_header,
00557     ACE_Unbounded_Stack<size_t>& allocated_blocks,
00558     const ACE_Message_Block& data)
00559 {
00560   size_t data_size = data.total_length();
00561   size_t remainder = data_size;
00562   bool result = true;
00563   // Save the number of items currently on the allocation list for
00564   ACE_Unbounded_Stack<size_t> blocks_to_free;
00565   size_t block_number = 0;
00566 
00567   // reverse the order so when we pop, we free up things closer to block 0
00568   // first
00569   while (allocated_blocks.pop(block_number) == 0)
00570   {
00571     blocks_to_free.push(block_number);
00572   }
00573   size_t pos = first_header.put_header(
00574     *first_block);
00575   const ACE_Message_Block* mblk = &data;
00576   remainder = this->fill_block(*first_block, pos, mblk, 0);
00577   while ((remainder == 0) && (mblk->cont() != 0))
00578   {
00579     pos += mblk->length();
00580     mblk = mblk->cont();
00581     remainder = this->fill_block(*first_block, pos, mblk, 0);
00582   }
00583   first_header.data_size = 
00584     static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (data_size - remainder);
00585   first_header.next_overflow = 0;
00586 
00587   Block_Header* prevhdr = &first_header;
00588   Persistent_Storage_Block* prevblk = first_block;
00589 
00590   while (remainder > 0)
00591   {
00592     Overflow_Header* hdr = 0;
00593     ACE_NEW_RETURN(hdr, Overflow_Header, result);
00594 
00595     Persistent_Storage_Block* curblk = this->allocator_->allocate();
00596     allocated_blocks.push(curblk->block_number());
00597     // Set the previous block's overflow "pointer" to us.
00598     prevhdr->next_overflow = ACE_Utils::truncate_cast<Block_Number> (curblk->block_number());
00599     prevhdr->put_header(*prevblk);
00600     pos = hdr->put_header(*curblk);
00601     hdr->data_size = 
00602       static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (remainder);
00603 
00604     size_t offset_into_msg = mblk->length() - remainder;
00605     remainder = this->fill_block(*curblk, pos, mblk, offset_into_msg);
00606     while ((remainder == 0) && (mblk->cont() != 0))
00607     {
00608       pos += mblk->length();
00609       mblk = mblk->cont();
00610       remainder = this->fill_block(*curblk, pos, mblk, 0);
00611     }
00612 
00613     hdr->data_size = hdr->data_size - 
00614       static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (remainder);
00615     if (prevblk != first_block)
00616     {
00617       // allocator obtains ownership, so write out and delete the header
00618       // only.
00619       result &= this->allocator_->write(prevblk);
00620 
00621       if (prevhdr != &first_header)
00622         delete prevhdr;
00623     }
00624     prevblk = curblk;
00625     prevhdr = hdr;
00626   }
00627   if (prevblk != first_block)
00628   {
00629     prevhdr->put_header(*prevblk);
00630     result &= this->allocator_->write(prevblk);
00631 
00632     if (prevhdr != &first_header)
00633       delete prevhdr;
00634   }
00635   pos = first_header.put_header(
00636     *first_block);
00637   // Free all but the first routing_slip_block
00638   while (blocks_to_free.pop(block_number) == 0)
00639   {
00640     this->allocator_->free(block_number);
00641   }
00642 
00643   return result;
00644 }
00645 
00646 bool
00647 Routing_Slip_Persistence_Manager::reload_chain(
00648   Persistent_Storage_Block* first_block, Block_Header& first_header,
00649   ACE_Unbounded_Stack<size_t>& allocated_blocks,
00650   ACE_Message_Block* amb,
00651   ACE_UINT64 expected_serial_number
00652   )
00653 {
00654   bool result = false;
00655   size_t block_size = this->allocator_->block_size();
00656   if (this->allocator_->read(first_block))
00657   {
00658     size_t pos = 0;
00659     size_t nextptr = 0;
00660     ACE_Message_Block* mbptr = amb;
00661     ACE_Message_Block* mbnew = 0;
00662 
00663     pos = first_header.extract_header(*first_block);
00664     if (first_header.serial_number == expected_serial_number)
00665     {
00666       // We have to copy the first block because we cache it.
00667       ACE_OS::memcpy(mbptr->wr_ptr(), first_block->data(),
00668         block_size);
00669       mbptr->rd_ptr(pos);
00670       mbptr->wr_ptr(pos + first_header.data_size);
00671       nextptr = first_header.next_overflow;
00672       while (nextptr != 0)
00673       {
00674         Overflow_Header overflow_header;
00675         ACE_NEW_RETURN(mbnew, ACE_Message_Block(block_size), result);
00676         mbptr->cont(mbnew);
00677         Persistent_Storage_Block* psb = this->allocator_->allocate_at(nextptr);
00678         mbptr = mbnew;
00679         // Deallocate the PSB's data and reallocate it to our wr_ptr()...
00680         psb->reassign_data(static_cast<unsigned char*> (static_cast<void*> (mbptr->wr_ptr())), true);
00681         // ...read into the PSB (whose data is inside of the AMB)...
00682         this->allocator_->read(psb);
00683         allocated_blocks.push(psb->block_number());
00684         // ...extract all headers so we know the data's size...
00685         pos = overflow_header.extract_header(*psb);
00686         // ...set up the region that somebody else can look at...
00687         mbptr->rd_ptr(pos);
00688         mbptr->wr_ptr(pos + overflow_header.data_size);
00689         // ...then make sure we don't delete data since we don't own it.
00690         psb->reassign_data(0);
00691         delete psb;
00692         nextptr = overflow_header.next_overflow;
00693       }
00694       result = true;
00695     }
00696   }
00697   return result;
00698 }
00699 
00700 bool
00701 Routing_Slip_Persistence_Manager::update_next_manager(
00702   Routing_Slip_Persistence_Manager* next)
00703 {
00704   bool result = false;
00705   ACE_GUARD_RETURN(TAO_SYNCH_MUTEX, ace_mon, this->lock_, result);
00706   ACE_ASSERT(this->persisted());
00707   if (!this->removed_)
00708   {
00709     bool updated = false;
00710     if (this->next_manager_ != 0)
00711     {
00712       if (this->routing_slip_header_.next_serial_number !=
00713         next->routing_slip_header_.next_serial_number)
00714       {
00715         this->routing_slip_header_.next_serial_number =
00716           next->routing_slip_header_.next_serial_number;
00717         updated = true;
00718       }
00719       if (this->routing_slip_header_.next_routing_slip_block !=
00720         next->routing_slip_header_.next_routing_slip_block)
00721       {
00722         this->routing_slip_header_.next_routing_slip_block =
00723           next->routing_slip_header_.next_routing_slip_block;
00724         updated = true;
00725       }
00726     }
00727     if (updated)
00728     {
00729       this->write_first_routing_slip_block();
00730     }
00731   }
00732   return result;
00733 }
00734 
00735 bool
00736 Routing_Slip_Persistence_Manager::persisted()
00737 {
00738   return (0 != this->first_routing_slip_block_);
00739 }
00740 
00741 bool
00742 Routing_Slip_Persistence_Manager::is_root () const
00743 {
00744   return this->serial_number_ == ROUTING_SLIP_ROOT_SERIAL_NUMBER;
00745 }
00746 
00747 void
00748 Routing_Slip_Persistence_Manager::release_all ()
00749 {
00750   ACE_ASSERT(is_root());
00751   while (this->next_manager_ != this)
00752   {
00753     Routing_Slip_Persistence_Manager * next = this->next_manager_;
00754     next->remove_from_dllist();
00755     ACE_ASSERT(next != this->next_manager_);
00756     delete next;
00757   }
00758 }
00759 
00760 size_t
00761 Routing_Slip_Persistence_Manager::write_first_routing_slip_block(
00762   bool prepare_only)
00763 {
00764   size_t pos = this->routing_slip_header_.put_header(
00765     *this->first_routing_slip_block_);
00766   if (!prepare_only)
00767   {
00768     this->allocator_->write(this->first_routing_slip_block_);
00769   }
00770   return pos;
00771 }
00772 
00773 void
00774 Routing_Slip_Persistence_Manager::dllist_push_back()
00775 {
00776   insert_before (&this->factory_->root());
00777 }
00778 
00779 void
00780 Routing_Slip_Persistence_Manager::insert_before (Routing_Slip_Persistence_Manager * node)
00781 {
00782   // Since this is a private function, the caller should have done locking
00783   // on the factory before calling here.  The same is true for removals.
00784   ACE_ASSERT(this->prev_manager_ == this);
00785   ACE_ASSERT(this->next_manager_ == this);
00786   ACE_ASSERT(node != this);
00787   this->prev_manager_ = node->prev_manager_;
00788   node->prev_manager_ = this;
00789   this->next_manager_ = node;
00790   this->prev_manager_->next_manager_ = this;
00791 }
00792 
00793 void
00794 Routing_Slip_Persistence_Manager::remove_from_dllist()
00795 {
00796   // Since this is a private function, the caller should have done locking
00797   // on the factory before calling here.  The same is true for insertions.
00798   ACE_ASSERT(this->persisted());
00799   ACE_ASSERT(this->prev_manager_ != this);
00800   ACE_ASSERT(this->next_manager_ != this);
00801   this->prev_manager_->next_manager_ = this->next_manager_;
00802   this->next_manager_->prev_manager_ = this->prev_manager_;
00803   this->prev_manager_ = this;
00804   this->next_manager_ = this;
00805 }
00806 
00807 } /* namespace TAO_Notify */
00808 
00809 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:45:29 2010 for TAO_CosNotification by  doxygen 1.4.7