Routing_Slip_Persistence_Manager.cpp

Go to the documentation of this file.
00001 // Routing_Slip_Persistence_Manager.cpp,v 1.12 2006/03/16 15:53:36 sjiang Exp
00002 
00003 #include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h"
00004 #include "orbsvcs/Notify/Standard_Event_Persistence.h"
00005 #include "orbsvcs/Notify/Persistent_File_Allocator.h"
00006 
00007 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00008 
00009 namespace TAO_Notify
00010 {
00011 
00012 Routing_Slip_Persistence_Manager::Routing_Slip_Persistence_Manager(
00013   Standard_Event_Persistence_Factory* factory)
00014   : removed_(false)
00015   , serial_number_(0)
00016   , allocator_(factory->allocator())
00017   , factory_(factory)
00018   , first_event_block_(0)
00019   , first_routing_slip_block_(0)
00020   , callback_(0)
00021   , event_mb_ (0)
00022   , routing_slip_mb_(0)
00023 {
00024   this->prev_manager_ = this;
00025   this->next_manager_ = this;
00026 }
00027 
00028 Routing_Slip_Persistence_Manager::~Routing_Slip_Persistence_Manager()
00029 {
00030   ACE_ASSERT(this->prev_manager_ == this);
00031   ACE_ASSERT(this->next_manager_ == this);
00032   delete this->first_event_block_;
00033   this->first_event_block_ = 0;
00034   delete this->first_routing_slip_block_;
00035   this->first_routing_slip_block_ = 0;
00036   delete this->event_mb_;
00037   this->event_mb_ = 0;
00038   delete this->routing_slip_mb_;
00039   this->routing_slip_mb_ = 0;
00040 }
00041 
00042 void
00043 Routing_Slip_Persistence_Manager::set_callback(Persistent_Callback* callback)
00044 {
00045   ACE_GUARD(ACE_SYNCH_MUTEX, ace_mon, this->lock_);
00046   this->callback_ = callback;
00047 }
00048 
00049 bool
00050 Routing_Slip_Persistence_Manager::store_root()
00051 {
00052   bool result = false;
00053 
00054   this->factory_->get_preallocated_pointer (
00055     this->routing_slip_header_.next_serial_number,
00056     this->routing_slip_header_.next_routing_slip_block);
00057 
00058   // we should already have a psb, but JIC
00059   ACE_ASSERT(this->first_routing_slip_block_ != 0);
00060   ACE_ASSERT(this->first_routing_slip_block_->block_number() ==
00061     ROUTING_SLIP_ROOT_BLOCK_NUMBER);
00062 
00063   // Don't take any chances.  Use hard-wired root serial number.
00064   this->routing_slip_header_.serial_number = ROUTING_SLIP_ROOT_SERIAL_NUMBER;
00065 
00066   // This will eventually break after something like 58000 years.
00067   // At such time we should change this to !=.
00068   ACE_ASSERT(this->routing_slip_header_.next_serial_number >
00069     ROUTING_SLIP_ROOT_SERIAL_NUMBER);
00070 
00071   ACE_Message_Block versioninfo(2);
00072   versioninfo.wr_ptr()[0] = 1; // Major version number
00073   versioninfo.wr_ptr()[1] = 0; // Minor version number
00074   versioninfo.wr_ptr(2);
00075   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, ace_mon, this->lock_, result);
00076   result = this->build_chain(this->first_routing_slip_block_,
00077     this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00078     versioninfo);
00079   if (result)
00080   {
00081    this->routing_slip_header_.put_header(*this->first_routing_slip_block_);
00082    this->allocator_->write(this->first_routing_slip_block_);
00083   }
00084   return result;
00085 }
00086 
00087 bool
00088 Routing_Slip_Persistence_Manager::reload(
00089   ACE_Message_Block*& event,
00090   ACE_Message_Block*& routing_slip)
00091 {
00092   bool result = false;
00093   if (this->event_mb_ != 0 && this->routing_slip_mb_ != 0)
00094   {
00095     event = this->event_mb_;
00096     this->event_mb_ = 0;
00097     routing_slip = this->routing_slip_mb_;
00098     this->routing_slip_mb_ = 0;
00099     result = true;
00100   }
00101   else
00102   {
00103     event = 0;
00104     routing_slip = 0;
00105   }
00106   return result;
00107 }
00108 
00109 bool
00110 Routing_Slip_Persistence_Manager::load(
00111   Block_Number block_number,
00112   Block_Serial_Number expected_serial_number)
00113 {
00114   /**
00115    * NOTE: There is no need to worry about guarding anything.  We assume
00116    *       that there will be one and only one thread doing the entire
00117    *       reload process.
00118    */
00119   bool result = false;
00120   size_t block_size = this->allocator_->block_size();
00121   this->first_routing_slip_block_ =
00122     this->allocator_->allocate_at(block_number);
00123   this->first_routing_slip_block_->set_allocator_owns(false);
00124   this->first_routing_slip_block_->set_sync();
00125 
00126   this->serial_number_ = expected_serial_number;
00127 
00128   ACE_NEW_NORETURN(this->routing_slip_mb_, ACE_Message_Block(block_size));
00129   ACE_NEW_NORETURN(this->event_mb_, ACE_Message_Block(block_size));
00130   if (this->event_mb_ != 0 && this->routing_slip_mb_ != 0)
00131   {
00132     if (this->reload_chain(
00133           this->first_routing_slip_block_,
00134           this->routing_slip_header_,
00135           this->allocated_routing_slip_blocks_,
00136           this->routing_slip_mb_,
00137           expected_serial_number))
00138     {
00139       if (this->routing_slip_header_.event_block != 0)
00140       {
00141         this->first_event_block_ = this->allocator_->allocate_at(
00142           this->routing_slip_header_.event_block);
00143         result = this->reload_chain(
00144           this->first_event_block_,
00145           this->event_header_,
00146           this->allocated_event_blocks_,
00147           this->event_mb_,
00148           0);
00149       }
00150       else if (block_number == ROUTING_SLIP_ROOT_BLOCK_NUMBER)
00151       {
00152         // only the root can lack event
00153         result = true;
00154       }
00155       else
00156       {
00157         ACE_ERROR((LM_ERROR,
00158           ACE_TEXT(
00159             "(%P|%t) Reloaded Persistent Event is missing event.\n")
00160           ));
00161       }
00162     }
00163   }
00164   if (! result)
00165   {
00166     delete this->routing_slip_mb_;
00167     this->routing_slip_mb_ = 0;
00168     delete this->event_mb_;
00169     this->event_mb_ = 0;
00170   }
00171   return result;
00172 }
00173 
00174 Routing_Slip_Persistence_Manager *
00175 Routing_Slip_Persistence_Manager::load_next ()
00176 {
00177   Routing_Slip_Persistence_Manager * result;
00178   ACE_NEW_RETURN(result, Routing_Slip_Persistence_Manager (this->factory_), 0);
00179 
00180   if (result->load(this->routing_slip_header_.next_routing_slip_block,
00181     this->routing_slip_header_.next_serial_number))
00182   {
00183     result->dllist_push_back();
00184   }
00185   else
00186   {
00187     // steal the psb for use as the next psb.
00188     // delete the rspm.  We'll create another one later.
00189     Persistent_Storage_Block * next_psb = result->first_routing_slip_block_;
00190     result->first_routing_slip_block_ = 0;
00191 //    next_psb->set_allocator_owns(true);
00192     this->factory_->done_reloading (
00193       next_psb,
00194       result->serial_number_);
00195     delete result;
00196     result = 0;
00197   }
00198   return result;
00199 }
00200 
00201 bool
00202 Routing_Slip_Persistence_Manager::store(const ACE_Message_Block& event,
00203   const ACE_Message_Block& routing_slip)
00204 {
00205   bool result = false;
00206   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, ace_mon, this->lock_, result);
00207   if (!this->removed_)
00208   {
00209     result = store_i(event, routing_slip);
00210   }
00211   return result;
00212 }
00213 
00214 bool
00215 Routing_Slip_Persistence_Manager::update(const ACE_Message_Block& routing_slip)
00216 {
00217   bool result = false;
00218   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, ace_mon, this->lock_, result);
00219   // If we have not gotten the event yet or we have no allocator, fail
00220   if (!this->removed_)
00221   {
00222     if (this->persisted())
00223     {
00224       result = update_i(routing_slip);
00225     }
00226   }
00227   return result;
00228 }
00229 
00230 bool
00231 Routing_Slip_Persistence_Manager::remove()
00232 {
00233   bool result = false;
00234   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, ace_mon, this->lock_, result);
00235   // Assert that this is in the dllist
00236   ACE_ASSERT(this->prev_manager_ != this);
00237   ACE_ASSERT(this->persisted());
00238   Routing_Slip_Persistence_Manager* prev = this->prev_manager_;
00239   // Once our previous manager removes us, we can deallocate in any order
00240   this->factory_->lock.acquire();
00241   this->remove_from_dllist();
00242   result = prev->update_next_manager(this);
00243   this->factory_->lock.release();
00244   size_t block_number = 0;
00245   if (this->first_routing_slip_block_ != 0)
00246   {
00247     this->allocator_->free(this->first_routing_slip_block_->block_number());
00248     delete this->first_routing_slip_block_;
00249     this->first_routing_slip_block_ = 0;
00250   }
00251   if (this->first_event_block_ != 0)
00252   {
00253     this->allocator_->free(this->first_event_block_->block_number());
00254     delete this->first_event_block_;
00255     this->first_event_block_ = 0;
00256   }
00257   while (this->allocated_routing_slip_blocks_.pop(block_number) == 0)
00258   {
00259     this->allocator_->free(block_number);
00260   }
00261   while (this->allocated_event_blocks_.pop(block_number) == 0)
00262   {
00263     this->allocator_->free(block_number);
00264   }
00265   this->removed_ = true;
00266   Persistent_Storage_Block* callbackblock =
00267     this->allocator_->allocate_nowrite();
00268   callbackblock->set_callback(this->callback_);
00269   result &= this->allocator_->write(callbackblock);
00270   return result;
00271 }
00272 
00273 Routing_Slip_Persistence_Manager::Block_Header::Block_Header(Header_Type type)
00274   : serial_number (0)
00275   , next_overflow(0)
00276   , header_type (static_cast<Block_Type> (type))
00277   , data_size(0)
00278 {
00279 }
00280 Routing_Slip_Persistence_Manager::Block_Header::~Block_Header (void)
00281 {
00282 }
00283 
00284 size_t
00285 Routing_Slip_Persistence_Manager::Block_Header::extract_header(
00286   Persistent_Storage_Block& psb, size_t offset)
00287 {
00288   size_t pos = offset;
00289   unsigned char* data = psb.data();
00290 
00291   serial_number = data[pos++];
00292   serial_number = (serial_number << 8) + data[pos++];
00293   serial_number = (serial_number << 8) + data[pos++];
00294   serial_number = (serial_number << 8) + data[pos++];
00295   serial_number = (serial_number << 8) + data[pos++];
00296   serial_number = (serial_number << 8) + data[pos++];
00297   serial_number = (serial_number << 8) + data[pos++];
00298   serial_number = (serial_number << 8) + data[pos++];
00299 
00300   next_overflow = data[pos++];
00301   next_overflow = (next_overflow << 8) + data[pos++];
00302   next_overflow = (next_overflow << 8) + data[pos++];
00303   next_overflow = (next_overflow << 8) + data[pos++];
00304 
00305   header_type = data[pos++];
00306   header_type = (data_size << 8) + data[pos++];
00307 
00308   data_size = data[pos++];
00309   data_size = (data_size << 8) + data[pos++];
00310   return pos;
00311 }
00312 
00313 size_t
00314 Routing_Slip_Persistence_Manager::Block_Header::put_header(
00315   Persistent_Storage_Block& psb, size_t offset)
00316 {
00317   // Assume that our psb can hold our small amount of data...
00318   size_t pos = offset;
00319   unsigned char* data = psb.data();
00320   // Store serial_number
00321   data[pos++] = static_cast<unsigned char> ((serial_number >> 56) & 0xff);
00322   data[pos++] = static_cast<unsigned char> ((serial_number >> 48) & 0xff);
00323   data[pos++] = static_cast<unsigned char> ((serial_number >> 40) & 0xff);
00324   data[pos++] = static_cast<unsigned char> ((serial_number >> 32) & 0xff);
00325   data[pos++] = static_cast<unsigned char> ((serial_number >> 24) & 0xff);
00326   data[pos++] = static_cast<unsigned char> ((serial_number >> 16) & 0xff);
00327   data[pos++] = static_cast<unsigned char> ((serial_number >> 8) & 0xff);
00328   data[pos++] = static_cast<unsigned char> ((serial_number >> 0) & 0xff);
00329   // Store next_overflow
00330   data[pos++] = static_cast<unsigned char> (next_overflow >> 24);
00331   data[pos++] = static_cast<unsigned char> ((next_overflow >> 16) & 0xff);
00332   data[pos++] = static_cast<unsigned char> ((next_overflow >> 8) & 0xff);
00333   data[pos++] = static_cast<unsigned char> (next_overflow & 0xff);
00334   // Store header_type
00335   data[pos++] = static_cast<unsigned char> ((header_type >> 8) & 0xff);
00336   data[pos++] = static_cast<unsigned char> (header_type & 0xff);
00337   // Store data_size
00338   data[pos++] = static_cast<unsigned char> ((data_size >> 8) & 0xff);
00339   data[pos++] = static_cast<unsigned char> (data_size & 0xff);
00340 
00341   return pos;
00342 }
00343 
00344 Routing_Slip_Persistence_Manager::Routing_Slip_Header::Routing_Slip_Header()
00345   : Block_Header (BT_Event)
00346   , next_routing_slip_block(0)
00347   , next_serial_number(0)
00348   , event_block(0)
00349 {
00350 }
00351 
00352 size_t
00353 Routing_Slip_Persistence_Manager::Routing_Slip_Header::extract_header(
00354   Persistent_Storage_Block& psb, size_t offset)
00355 {
00356   size_t pos = offset;
00357   pos = this->Block_Header::extract_header(psb, pos);
00358   unsigned char* data = psb.data();
00359   next_routing_slip_block = data[pos++];
00360   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00361   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00362   next_routing_slip_block = (next_routing_slip_block << 8) + data[pos++];
00363   next_serial_number = data[pos++];
00364   next_serial_number = (next_serial_number << 8) + data[pos++];
00365   next_serial_number = (next_serial_number << 8) + data[pos++];
00366   next_serial_number = (next_serial_number << 8) + data[pos++];
00367   next_serial_number = (next_serial_number << 8) + data[pos++];
00368   next_serial_number = (next_serial_number << 8) + data[pos++];
00369   next_serial_number = (next_serial_number << 8) + data[pos++];
00370   next_serial_number = (next_serial_number << 8) + data[pos++];
00371   event_block = data[pos++];
00372   event_block = (event_block << 8) + data[pos++];
00373   event_block = (event_block << 8) + data[pos++];
00374   event_block = (event_block << 8) + data[pos++];
00375   return pos;
00376 }
00377 
00378 size_t
00379 Routing_Slip_Persistence_Manager::Routing_Slip_Header::put_header(
00380   Persistent_Storage_Block& psb, size_t offset)
00381 {
00382   // Assume that our psb can hold our small amount of data...
00383   size_t pos = offset;
00384   // Store serial number, next_overflow and data_size
00385   pos = this->Block_Header::put_header(psb, pos);
00386 
00387   unsigned char* data = psb.data();
00388   // Store next_routing_slip_block
00389   data[pos++] = static_cast<unsigned char> (next_routing_slip_block >> 24);
00390   data[pos++] = static_cast<unsigned char> ((next_routing_slip_block >> 16) & 0xff);
00391   data[pos++] = static_cast<unsigned char> ((next_routing_slip_block >> 8) & 0xff);
00392   data[pos++] = static_cast<unsigned char> (next_routing_slip_block & 0xff);
00393   // Store serial_number
00394   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 56) & 0xff);
00395   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 48) & 0xff);
00396   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 40) & 0xff);
00397   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 32) & 0xff);
00398   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 24) & 0xff);
00399   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 16) & 0xff);
00400   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 8) & 0xff);
00401   data[pos++] = static_cast<unsigned char> ((next_serial_number >> 0) & 0xff);
00402   // Store event_block
00403   data[pos++] = static_cast<unsigned char> (event_block >> 24);
00404   data[pos++] = static_cast<unsigned char> ((event_block >> 16) & 0xff);
00405   data[pos++] = static_cast<unsigned char> ((event_block >> 8) & 0xff);
00406   data[pos++] = static_cast<unsigned char> (event_block & 0xff);
00407   return pos;
00408 }
00409 
00410 Routing_Slip_Persistence_Manager::Overflow_Header::Overflow_Header ()
00411   : Block_Header (BT_Overflow)
00412 {
00413 }
00414 
00415 Routing_Slip_Persistence_Manager::Event_Header::Event_Header ()
00416   : Block_Header (BT_Routing_Slip)
00417 {
00418 }
00419 
00420 bool
00421 Routing_Slip_Persistence_Manager::store_i(const ACE_Message_Block& event,
00422   const ACE_Message_Block& routing_slip)
00423 {
00424   bool result = false;
00425 
00426   bool initially_persisted = this->persisted();
00427   if (!initially_persisted)
00428   {
00429     this->factory_->lock.acquire();
00430     this->factory_->preallocate_next_record(this->serial_number_,
00431       this->first_routing_slip_block_,
00432       this->routing_slip_header_.next_serial_number,
00433       this->routing_slip_header_.next_routing_slip_block);
00434     this->routing_slip_header_.serial_number = this->serial_number_;
00435   }
00436 
00437   result = this->build_chain(this->first_routing_slip_block_,
00438     this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00439     routing_slip);
00440 
00441   if (result)
00442   {
00443     // No need for a callback here since we do our own below
00444     result &= this->store_event(event);
00445     // If we have an event block allocated, update our header
00446     if (this->first_event_block_ != 0)
00447     {
00448       this->routing_slip_header_.event_block =
00449         this->first_event_block_->block_number();
00450     }
00451     else
00452     {
00453       ACE_ERROR((LM_ERROR,
00454         ACE_TEXT(
00455           "(%P|%t) No Event is being stored with this routing slip.\n")
00456         ));
00457     }
00458     // Always write our first block out.
00459     this->dllist_push_back();
00460     result &= (this->write_first_routing_slip_block() != 0);
00461     // because the first rs blocks everywhere have been given sync, we are
00462     // guaranteed that they will be totally written by the time we get to this
00463     // empty callback-only block.
00464     Persistent_Storage_Block* callbackblock =
00465       this->allocator_->allocate_nowrite();
00466     callbackblock->set_callback(this->callback_);
00467     result &= this->allocator_->write(callbackblock);
00468   }
00469   if (!initially_persisted)
00470   {
00471     this->factory_->lock.release();
00472   }
00473   return result;
00474 }
00475 
00476 bool
00477 Routing_Slip_Persistence_Manager::update_i(
00478   const ACE_Message_Block& routing_slip)
00479 {
00480   bool result = true;
00481   size_t routing_slip_size = routing_slip.total_length();
00482   if (routing_slip_size != 0)
00483   {
00484     result = this->build_chain(this->first_routing_slip_block_,
00485       this->routing_slip_header_, this->allocated_routing_slip_blocks_,
00486       routing_slip);
00487 
00488     result &= this->allocator_->write(this->first_routing_slip_block_);
00489   }
00490   Persistent_Storage_Block* callbackblock =
00491     this->allocator_->allocate_nowrite();
00492   callbackblock->set_callback(this->callback_);
00493   result &= this->allocator_->write(callbackblock);
00494   return result;
00495 }
00496 
00497 bool
00498 Routing_Slip_Persistence_Manager::store_event(
00499   const ACE_Message_Block& event)
00500 {
00501   bool result = true;
00502   size_t event_size = event.total_length();
00503   if (event_size != 0)
00504   {
00505     if (this->first_event_block_ == 0)
00506     {
00507       this->first_event_block_ = this->allocator_->allocate();
00508       this->first_event_block_->set_allocator_owns(false);
00509     }
00510 
00511     result = this->build_chain(this->first_event_block_,
00512       this->event_header_, this->allocated_event_blocks_,
00513       event);
00514 
00515     result &= this->allocator_->write(this->first_event_block_);
00516   }
00517   return result;
00518 }
00519 
00520 size_t
00521 Routing_Slip_Persistence_Manager::fill_block(Persistent_Storage_Block& psb,
00522   size_t offset_into_block, const ACE_Message_Block* data,
00523   size_t offset_into_msg)
00524 {
00525   unsigned char* ptr = (unsigned char*)data->rd_ptr();
00526   return this->fill_block(psb, offset_into_block, ptr + offset_into_msg,
00527   data->length() - offset_into_msg);
00528 }
00529 
00530 size_t
00531 Routing_Slip_Persistence_Manager::fill_block(Persistent_Storage_Block& psb,
00532   size_t offset_into_block, unsigned char* data, size_t data_size)
00533 {
00534   size_t result = 0;
00535   if (data_size > 0)
00536   {
00537     const size_t max_size = this->allocator_->block_size() - offset_into_block;
00538     size_t size_to_copy = data_size;
00539     if (size_to_copy > max_size)
00540     {
00541       size_to_copy = max_size;
00542       result = data_size - size_to_copy;
00543     }
00544     else
00545     {
00546       result = 0;
00547     }
00548     ACE_OS::memcpy(psb.data() + offset_into_block, data, size_to_copy);
00549   }
00550   return result;
00551 }
00552 
00553 bool
00554 Routing_Slip_Persistence_Manager::build_chain(
00555     Persistent_Storage_Block* first_block, Block_Header& first_header,
00556     ACE_Unbounded_Stack<size_t>& allocated_blocks,
00557     const ACE_Message_Block& data)
00558 {
00559   size_t data_size = data.total_length();
00560   size_t remainder = data_size;
00561   bool result = true;
00562   // Save the number of items currently on the allocation list for
00563   ACE_Unbounded_Stack<size_t> blocks_to_free;
00564   size_t block_number = 0;
00565 
00566   // reverse the order so when we pop, we free up things closer to block 0
00567   // first
00568   while (allocated_blocks.pop(block_number) == 0)
00569   {
00570     blocks_to_free.push(block_number);
00571   }
00572   size_t pos = first_header.put_header(
00573     *first_block);
00574   const ACE_Message_Block* mblk = &data;
00575   remainder = this->fill_block(*first_block, pos, mblk, 0);
00576   while ((remainder == 0) && (mblk->cont() != 0))
00577   {
00578     pos += mblk->length();
00579     mblk = mblk->cont();
00580     remainder = this->fill_block(*first_block, pos, mblk, 0);
00581   }
00582   first_header.data_size = 
00583     static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (data_size - remainder);
00584   first_header.next_overflow = 0;
00585 
00586   Block_Header* prevhdr = &first_header;
00587   Persistent_Storage_Block* prevblk = first_block;
00588 
00589   while (remainder > 0)
00590   {
00591     Overflow_Header* hdr = 0;
00592     ACE_NEW_RETURN(hdr, Overflow_Header, result);
00593 
00594     Persistent_Storage_Block* curblk = this->allocator_->allocate();
00595     allocated_blocks.push(curblk->block_number());
00596     // Set the previous block's overflow "pointer" to us.
00597     prevhdr->next_overflow = curblk->block_number();
00598     prevhdr->put_header(*prevblk);
00599     pos = hdr->put_header(*curblk);
00600     hdr->data_size = 
00601       static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (remainder);
00602 
00603     size_t offset_into_msg = mblk->length() - remainder;
00604     remainder = this->fill_block(*curblk, pos, mblk, offset_into_msg);
00605     while ((remainder == 0) && (mblk->cont() != 0))
00606     {
00607       pos += mblk->length();
00608       mblk = mblk->cont();
00609       remainder = this->fill_block(*curblk, pos, mblk, 0);
00610     }
00611 
00612     hdr->data_size = hdr->data_size - 
00613       static_cast<TAO_Notify::Routing_Slip_Persistence_Manager::Block_Size> (remainder);
00614     if (prevblk != first_block)
00615     {
00616       // allocator obtains ownership, so write out and delete the header
00617       // only.
00618       result &= this->allocator_->write(prevblk);
00619       delete prevhdr;
00620     }
00621     prevblk = curblk;
00622     prevhdr = hdr;
00623   }
00624   if (prevblk != first_block)
00625   {
00626     prevhdr->put_header(*prevblk);
00627     result &= this->allocator_->write(prevblk);
00628     delete prevhdr;
00629   }
00630   pos = first_header.put_header(
00631     *first_block);
00632   // Free all but the first routing_slip_block
00633   while (blocks_to_free.pop(block_number) == 0)
00634   {
00635     this->allocator_->free(block_number);
00636   }
00637 
00638   return result;
00639 }
00640 
00641 bool
00642 Routing_Slip_Persistence_Manager::reload_chain(
00643   Persistent_Storage_Block* first_block, Block_Header& first_header,
00644   ACE_Unbounded_Stack<size_t>& allocated_blocks,
00645   ACE_Message_Block* amb,
00646   ACE_UINT64 expected_serial_number
00647   )
00648 {
00649   bool result = false;
00650   size_t block_size = this->allocator_->block_size();
00651   if (this->allocator_->read(first_block))
00652   {
00653     size_t pos = 0;
00654     size_t nextptr = 0;
00655     ACE_Message_Block* mbptr = amb;
00656     ACE_Message_Block* mbnew = 0;
00657 
00658     pos = first_header.extract_header(*first_block);
00659     if (first_header.serial_number == expected_serial_number)
00660     {
00661       // We have to copy the first block because we cache it.
00662       ACE_OS::memcpy(mbptr->wr_ptr(), first_block->data(),
00663         block_size);
00664       mbptr->rd_ptr(pos);
00665       mbptr->wr_ptr(pos + first_header.data_size);
00666       nextptr = first_header.next_overflow;
00667       while (nextptr != 0)
00668       {
00669         Overflow_Header overflow_header;
00670         ACE_NEW_RETURN(mbnew, ACE_Message_Block(block_size), result);
00671         mbptr->cont(mbnew);
00672         Persistent_Storage_Block* psb = this->allocator_->allocate_at(nextptr);
00673         mbptr = mbnew;
00674         // Deallocate the PSB's data and reallocate it to our wr_ptr()...
00675         psb->reassign_data(static_cast<unsigned char*> (static_cast<void*> (mbptr->wr_ptr())), true);
00676         // ...read into the PSB (whose data is inside of the AMB)...
00677         this->allocator_->read(psb);
00678         allocated_blocks.push(psb->block_number());
00679         // ...extract all headers so we know the data's size...
00680         pos = overflow_header.extract_header(*psb);
00681         // ...set up the region that somebody else can look at...
00682         mbptr->rd_ptr(pos);
00683         mbptr->wr_ptr(pos + overflow_header.data_size);
00684         // ...then make sure we don't delete data since we don't own it.
00685         psb->reassign_data(0);
00686         delete psb;
00687         nextptr = overflow_header.next_overflow;
00688       }
00689       result = true;
00690     }
00691   }
00692   return result;
00693 }
00694 
00695 bool
00696 Routing_Slip_Persistence_Manager::update_next_manager(
00697   Routing_Slip_Persistence_Manager* next)
00698 {
00699   bool result = false;
00700   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, ace_mon, this->lock_, result);
00701   ACE_ASSERT(this->persisted());
00702   if (!this->removed_)
00703   {
00704     bool updated = false;
00705     if (this->next_manager_ != 0)
00706     {
00707       if (this->routing_slip_header_.next_serial_number !=
00708         next->routing_slip_header_.next_serial_number)
00709       {
00710         this->routing_slip_header_.next_serial_number =
00711           next->routing_slip_header_.next_serial_number;
00712         updated = true;
00713       }
00714       if (this->routing_slip_header_.next_routing_slip_block !=
00715         next->routing_slip_header_.next_routing_slip_block)
00716       {
00717         this->routing_slip_header_.next_routing_slip_block =
00718           next->routing_slip_header_.next_routing_slip_block;
00719         updated = true;
00720       }
00721     }
00722     if (updated)
00723     {
00724       this->write_first_routing_slip_block();
00725     }
00726   }
00727   return result;
00728 }
00729 
00730 bool
00731 Routing_Slip_Persistence_Manager::persisted()
00732 {
00733   return (0 != this->first_routing_slip_block_);
00734 }
00735 
00736 bool
00737 Routing_Slip_Persistence_Manager::is_root () const
00738 {
00739   return this->serial_number_ == ROUTING_SLIP_ROOT_SERIAL_NUMBER;
00740 }
00741 
00742 void
00743 Routing_Slip_Persistence_Manager::release_all ()
00744 {
00745   ACE_ASSERT(is_root());
00746   while (this->next_manager_ != this)
00747   {
00748     Routing_Slip_Persistence_Manager * next = this->next_manager_;
00749     next->remove_from_dllist();
00750     ACE_ASSERT(next != this->next_manager_);
00751     delete next;
00752   }
00753 }
00754 
00755 size_t
00756 Routing_Slip_Persistence_Manager::write_first_routing_slip_block(
00757   bool prepare_only)
00758 {
00759   size_t pos = this->routing_slip_header_.put_header(
00760     *this->first_routing_slip_block_);
00761   if (!prepare_only)
00762   {
00763     this->allocator_->write(this->first_routing_slip_block_);
00764   }
00765   return pos;
00766 }
00767 
00768 void
00769 Routing_Slip_Persistence_Manager::dllist_push_back()
00770 {
00771   insert_before (&this->factory_->root());
00772 }
00773 
00774 void
00775 Routing_Slip_Persistence_Manager::insert_before (Routing_Slip_Persistence_Manager * node)
00776 {
00777   // Since this is a private function, the caller should have done locking
00778   // on the factory before calling here.  The same is true for removals.
00779   ACE_ASSERT(this->prev_manager_ == this);
00780   ACE_ASSERT(this->next_manager_ == this);
00781   ACE_ASSERT(node != this);
00782   this->prev_manager_ = node->prev_manager_;
00783   node->prev_manager_ = this;
00784   this->next_manager_ = node;
00785   this->prev_manager_->next_manager_ = this;
00786 }
00787 
00788 void
00789 Routing_Slip_Persistence_Manager::remove_from_dllist()
00790 {
00791   // Since this is a private function, the caller should have done locking
00792   // on the factory before calling here.  The same is true for insertions.
00793   ACE_ASSERT(this->persisted());
00794   ACE_ASSERT(this->prev_manager_ != this);
00795   ACE_ASSERT(this->next_manager_ != this);
00796   this->prev_manager_->next_manager_ = this->next_manager_;
00797   this->next_manager_->prev_manager_ = this->prev_manager_;
00798   this->prev_manager_ = this;
00799   this->next_manager_ = this;
00800 }
00801 
00802 } /* namespace TAO_Notify */
00803 
00804 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:15 2006 for TAO_CosNotification by doxygen 1.3.6