Incoming_Message_Queue.cpp

Go to the documentation of this file.
00001 #include "tao/Incoming_Message_Queue.h"
00002 #include "tao/debug.h"
00003 
00004 #include "ace/Log_Msg.h"
00005 #include "ace/Malloc_Base.h"
00006 
00007 
00008 #if !defined (__ACE_INLINE__)
00009 # include "tao/Incoming_Message_Queue.inl"
00010 #endif /* __ACE_INLINE__ */
00011 
00012 
00013 ACE_RCSID (tao,
00014            Incoming_Message_Queue,
00015            "Incoming_Message_Queue.cpp,v 1.23 2006/04/19 08:53:16 jwillemsen Exp")
00016 
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
00020   : last_added_ (0),
00021     size_ (0),
00022     orb_core_ (orb_core)
00023 {
00024 }
00025 
00026 TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
00027 {
00028   const CORBA::ULong sz = this->size_;
00029 
00030   // Delete all the nodes left behind
00031   for (CORBA::ULong i = 0;
00032        i < sz;
00033        ++i)
00034     {
00035       TAO_Queued_Data *qd = this->dequeue_head ();
00036       TAO_Queued_Data::release (qd);
00037     }
00038 }
00039 
00040 
00041 TAO_Queued_Data *
00042 TAO_Incoming_Message_Queue::dequeue_head (void)
00043 {
00044   if (this->size_ == 0)
00045     return 0;
00046 
00047   // Get the node on the head of the queue...
00048   TAO_Queued_Data * const head = this->last_added_->next_;
00049 
00050   // Reset the head node..
00051   this->last_added_->next_ = head->next_;
00052 
00053   // Decrease the size and reset last_added_ if empty
00054   if (--this->size_ == 0)
00055     this->last_added_ = 0;
00056 
00057   return head;
00058 }
00059 
00060 TAO_Queued_Data *
00061 TAO_Incoming_Message_Queue::dequeue_tail (void)
00062 {
00063   // This is a bit painful stuff...
00064   if (this->size_ == 0)
00065     return 0;
00066 
00067   // Get the node on the head of the queue...
00068   TAO_Queued_Data *head =
00069     this->last_added_->next_;
00070 
00071   while (head->next_ != this->last_added_)
00072     {
00073       head = head->next_;
00074     }
00075 
00076   // Put the head in tmp.
00077   head->next_ = this->last_added_->next_;
00078 
00079   TAO_Queued_Data *ret_qd = this->last_added_;
00080 
00081   this->last_added_ = head;
00082 
00083   // Decrease the size
00084   if (--this->size_ == 0)
00085     this->last_added_ = 0;
00086 
00087  return ret_qd;
00088 }
00089 
00090 int
00091 TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
00092 {
00093   if (this->size_ == 0)
00094     {
00095       this->last_added_ = nd;
00096       this->last_added_->next_ = this->last_added_;
00097     }
00098   else
00099     {
00100       nd->next_ = this->last_added_->next_;
00101       this->last_added_->next_ = nd;
00102       this->last_added_ = nd;
00103     }
00104 
00105   ++ this->size_;
00106   return 0;
00107 }
00108 
00109 
00110 /************************************************************************/
00111 // Methods  for TAO_Queued_Data
00112 /************************************************************************/
00113 
00114 /*!
00115  * @brief Allocate and return a new empty message block of size \a span_size
00116  * mimicking parameters of \a mb.
00117  *
00118  * This function allocates a new aligned message block using the same
00119  * allocators and flags as found in \a mb.  The size of the new message
00120  * block is at least \a span_size; the size may be adjusted up in order
00121  * to accomodate alignment requirements and still fit \a span_size bytes
00122  * into the aligned buffer.
00123  *
00124  * @param mb message block whose parameters should be mimicked
00125  * @param span_size size of the new message block (will be adjusted for proper
00126  * alignment)
00127  * @return an aligned message block with rd_ptr sitting at correct
00128  * alignment spot, 0 on failure
00129  */
00130 static ACE_Message_Block*
00131 clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
00132 {
00133   // Calculate the required size of the cloned block with alignment
00134   size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
00135 
00136   // Get the allocators
00137   ACE_Allocator *data_allocator = 0;
00138   ACE_Allocator *data_block_allocator = 0;
00139   ACE_Allocator *message_block_allocator = 0;
00140   mb->access_allocators (data_allocator,
00141                          data_block_allocator,
00142                          message_block_allocator);
00143 
00144   // Create a new Message Block
00145   ACE_Message_Block *nb = 0;
00146   ACE_NEW_MALLOC_RETURN (nb,
00147                          static_cast<ACE_Message_Block*> (
00148                                          message_block_allocator->malloc (
00149                                            sizeof (ACE_Message_Block))),
00150                          ACE_Message_Block(aligned_size,
00151                                            mb->msg_type(),
00152                                            mb->cont(),
00153                                            0, //we want the data block created
00154                                            data_allocator,
00155                                            mb->locking_strategy(),
00156                                            mb->msg_priority(),
00157                                            mb->msg_execution_time (),
00158                                            mb->msg_deadline_time (),
00159                                            data_block_allocator,
00160                                            message_block_allocator),
00161                          0);
00162 
00163   ACE_CDR::mb_align (nb);
00164 
00165   // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
00166   // we just dynamically allocated the two things.
00167   nb->set_flags (mb->flags());
00168   nb->clr_flags (ACE_Message_Block::DONT_DELETE);
00169 
00170   return nb;
00171 }
00172 
00173 TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
00174   : msg_block_ (0),
00175     missing_data_ (0),
00176     major_version_ (0),
00177     minor_version_ (0),
00178     byte_order_ (0),
00179     more_fragments_ (0),
00180     msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
00181     next_ (0),
00182     allocator_ (alloc)
00183 {
00184 }
00185 
00186 TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
00187                                   ACE_Allocator *alloc)
00188   : msg_block_ (mb),
00189     missing_data_ (0),
00190     major_version_ (0),
00191     minor_version_ (0),
00192     byte_order_ (0),
00193     more_fragments_ (0),
00194     msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
00195     next_ (0),
00196     allocator_ (alloc)
00197 {
00198 }
00199 
00200 TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
00201   : msg_block_ (qd.msg_block_->duplicate ()),
00202     missing_data_ (qd.missing_data_),
00203     major_version_ (qd.major_version_),
00204     minor_version_ (qd.minor_version_),
00205     byte_order_ (qd.byte_order_),
00206     more_fragments_ (qd.more_fragments_),
00207     msg_type_ (qd.msg_type_),
00208     next_ (0),
00209     allocator_ (qd.allocator_)
00210 {
00211 }
00212 
00213 /*static*/
00214 TAO_Queued_Data *
00215 TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
00216 {
00217   TAO_Queued_Data *qd = 0;
00218 
00219   if (alloc)
00220     {
00221       ACE_NEW_MALLOC_RETURN (qd,
00222                              static_cast<TAO_Queued_Data *> (
00223                                alloc->malloc (sizeof (TAO_Queued_Data))),
00224                              TAO_Queued_Data (alloc),
00225                              0);
00226 
00227       return qd;
00228     }
00229 
00230   // No allocator, so use the global pool!
00231   // @@ TODO: We should be removing this at some point of time!
00232   if (TAO_debug_level == 4)
00233     {
00234       // This debug is for testing purposes!
00235       ACE_DEBUG ((LM_DEBUG,
00236                   "TAO (%P|%t) - Queued_Data::get_queued_data\n"
00237                   "Using global pool for allocation \n"));
00238     }
00239 
00240   ACE_NEW_RETURN (qd,
00241                   TAO_Queued_Data,
00242                   0);
00243 
00244   return qd;
00245 }
00246 
00247 /*static*/
00248 void
00249 TAO_Queued_Data::release (TAO_Queued_Data *qd)
00250 {
00251   //// TODO
00252   ACE_Message_Block::release (qd->msg_block_);
00253 
00254   if (qd->allocator_)
00255     {
00256       ACE_DES_FREE (qd,
00257                     qd->allocator_->free,
00258                     TAO_Queued_Data);
00259 
00260       return;
00261     }
00262 
00263   // @@todo: Need to be removed at some point of time!
00264   if (TAO_debug_level == 4)
00265     {
00266       // This debug is for testing purposes!
00267       ACE_DEBUG ((LM_DEBUG,
00268                   "TAO (%P|%t) - Queued_Data[%d]::release\n",
00269                   "Using global pool for releasing \n"));
00270     }
00271   delete qd;
00272 
00273 }
00274 
00275 
00276 TAO_Queued_Data *
00277 TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
00278 {
00279   // Check to see if the underlying block is on the stack. If not it
00280   // is fine. If the datablock is on stack, try to make a copy of that
00281   // before doing a duplicate.
00282   // @@ todo: Theoretically this should be within the Message Block,
00283   // but we dont have much scope to do this in that mess. Probably in
00284   // the next stage of MB rewrite we should be okay
00285   ACE_Message_Block::Message_Flags fl =
00286     sqd.msg_block_->self_flags ();
00287 
00288   if (ACE_BIT_ENABLED (fl,
00289                        ACE_Message_Block::DONT_DELETE))
00290     (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_);
00291 
00292 
00293   TAO_Queued_Data *qd = 0;
00294 
00295   if (sqd.allocator_)
00296     {
00297       ACE_NEW_MALLOC_RETURN (qd,
00298                              static_cast<TAO_Queued_Data *> (
00299                                sqd.allocator_->malloc (sizeof (TAO_Queued_Data))),
00300                              TAO_Queued_Data (sqd),
00301                              0);
00302 
00303       return qd;
00304     }
00305 
00306   // No allocator, so use the global pool!
00307   // @@ TODO: We should be removing this at some point of time!
00308   if (TAO_debug_level == 4)
00309     {
00310       // This debug is for testing purposes!
00311       ACE_DEBUG ((LM_DEBUG,
00312                   "TAO (%P|%t) - Queued_Data[%d]::duplicate\n",
00313                   "Using global pool for allocation \n"));
00314     }
00315 
00316   ACE_NEW_RETURN (qd,
00317                   TAO_Queued_Data (sqd),
00318                   0);
00319 
00320   return qd;
00321 }
00322 
00323 int
00324 TAO_Queued_Data::consolidate (void)
00325 {
00326   // Is this a chain of fragments?
00327   if (this->more_fragments_ && this->msg_block_->cont () != 0)
00328     {
00329       // Create a message block big enough to hold the entire chain
00330       ACE_Message_Block *dest = clone_mb_nocopy_size (
00331                                       this->msg_block_,
00332                                       this->msg_block_->total_length ());
00333 
00334       if (0 == dest)
00335         {
00336           // out of memory
00337           return -1;
00338         }
00339       // Memory allocation succeeded, the new message block can hold the consolidated
00340       // message. The following code just copies all the data into this new message block.
00341       // No further memory allocation will take place.
00342 
00343       // Reset the cont() parameter.  We have cloned the message
00344       // block but not the chain as we will no longer have chain.
00345       dest->cont (0);
00346 
00347       // Use ACE_CDR to consolidate the chain for us
00348       ACE_CDR::consolidate (dest, this->msg_block_);
00349 
00350       // free the original message block chain
00351       this->msg_block_->release ();
00352 
00353       // Set the message block to the new consolidated message block
00354       this->msg_block_ = dest;
00355       this->more_fragments_ = 0;
00356     }
00357 
00358   return 0;
00359 }
00360 
00361 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 11:54:13 2006 for TAO by doxygen 1.3.6