00001 #include "tao/Incoming_Message_Queue.h"
00002 #include "tao/debug.h"
00003
00004 #include "ace/Log_Msg.h"
00005 #include "ace/Malloc_Base.h"
00006
00007
00008 #if !defined (__ACE_INLINE__)
00009 # include "tao/Incoming_Message_Queue.inl"
00010 #endif
00011
00012
00013 ACE_RCSID (tao,
00014 Incoming_Message_Queue,
00015 "Incoming_Message_Queue.cpp,v 1.23 2006/04/19 08:53:16 jwillemsen Exp")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
00020 : last_added_ (0),
00021 size_ (0),
00022 orb_core_ (orb_core)
00023 {
00024 }
00025
00026 TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
00027 {
00028 const CORBA::ULong sz = this->size_;
00029
00030
00031 for (CORBA::ULong i = 0;
00032 i < sz;
00033 ++i)
00034 {
00035 TAO_Queued_Data *qd = this->dequeue_head ();
00036 TAO_Queued_Data::release (qd);
00037 }
00038 }
00039
00040
00041 TAO_Queued_Data *
00042 TAO_Incoming_Message_Queue::dequeue_head (void)
00043 {
00044 if (this->size_ == 0)
00045 return 0;
00046
00047
00048 TAO_Queued_Data * const head = this->last_added_->next_;
00049
00050
00051 this->last_added_->next_ = head->next_;
00052
00053
00054 if (--this->size_ == 0)
00055 this->last_added_ = 0;
00056
00057 return head;
00058 }
00059
00060 TAO_Queued_Data *
00061 TAO_Incoming_Message_Queue::dequeue_tail (void)
00062 {
00063
00064 if (this->size_ == 0)
00065 return 0;
00066
00067
00068 TAO_Queued_Data *head =
00069 this->last_added_->next_;
00070
00071 while (head->next_ != this->last_added_)
00072 {
00073 head = head->next_;
00074 }
00075
00076
00077 head->next_ = this->last_added_->next_;
00078
00079 TAO_Queued_Data *ret_qd = this->last_added_;
00080
00081 this->last_added_ = head;
00082
00083
00084 if (--this->size_ == 0)
00085 this->last_added_ = 0;
00086
00087 return ret_qd;
00088 }
00089
00090 int
00091 TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
00092 {
00093 if (this->size_ == 0)
00094 {
00095 this->last_added_ = nd;
00096 this->last_added_->next_ = this->last_added_;
00097 }
00098 else
00099 {
00100 nd->next_ = this->last_added_->next_;
00101 this->last_added_->next_ = nd;
00102 this->last_added_ = nd;
00103 }
00104
00105 ++ this->size_;
00106 return 0;
00107 }
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130 static ACE_Message_Block*
00131 clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
00132 {
00133
00134 size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
00135
00136
00137 ACE_Allocator *data_allocator = 0;
00138 ACE_Allocator *data_block_allocator = 0;
00139 ACE_Allocator *message_block_allocator = 0;
00140 mb->access_allocators (data_allocator,
00141 data_block_allocator,
00142 message_block_allocator);
00143
00144
00145 ACE_Message_Block *nb = 0;
00146 ACE_NEW_MALLOC_RETURN (nb,
00147 static_cast<ACE_Message_Block*> (
00148 message_block_allocator->malloc (
00149 sizeof (ACE_Message_Block))),
00150 ACE_Message_Block(aligned_size,
00151 mb->msg_type(),
00152 mb->cont(),
00153 0,
00154 data_allocator,
00155 mb->locking_strategy(),
00156 mb->msg_priority(),
00157 mb->msg_execution_time (),
00158 mb->msg_deadline_time (),
00159 data_block_allocator,
00160 message_block_allocator),
00161 0);
00162
00163 ACE_CDR::mb_align (nb);
00164
00165
00166
00167 nb->set_flags (mb->flags());
00168 nb->clr_flags (ACE_Message_Block::DONT_DELETE);
00169
00170 return nb;
00171 }
00172
00173 TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
00174 : msg_block_ (0),
00175 missing_data_ (0),
00176 major_version_ (0),
00177 minor_version_ (0),
00178 byte_order_ (0),
00179 more_fragments_ (0),
00180 msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
00181 next_ (0),
00182 allocator_ (alloc)
00183 {
00184 }
00185
00186 TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
00187 ACE_Allocator *alloc)
00188 : msg_block_ (mb),
00189 missing_data_ (0),
00190 major_version_ (0),
00191 minor_version_ (0),
00192 byte_order_ (0),
00193 more_fragments_ (0),
00194 msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
00195 next_ (0),
00196 allocator_ (alloc)
00197 {
00198 }
00199
00200 TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
00201 : msg_block_ (qd.msg_block_->duplicate ()),
00202 missing_data_ (qd.missing_data_),
00203 major_version_ (qd.major_version_),
00204 minor_version_ (qd.minor_version_),
00205 byte_order_ (qd.byte_order_),
00206 more_fragments_ (qd.more_fragments_),
00207 msg_type_ (qd.msg_type_),
00208 next_ (0),
00209 allocator_ (qd.allocator_)
00210 {
00211 }
00212
00213
00214 TAO_Queued_Data *
00215 TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
00216 {
00217 TAO_Queued_Data *qd = 0;
00218
00219 if (alloc)
00220 {
00221 ACE_NEW_MALLOC_RETURN (qd,
00222 static_cast<TAO_Queued_Data *> (
00223 alloc->malloc (sizeof (TAO_Queued_Data))),
00224 TAO_Queued_Data (alloc),
00225 0);
00226
00227 return qd;
00228 }
00229
00230
00231
00232 if (TAO_debug_level == 4)
00233 {
00234
00235 ACE_DEBUG ((LM_DEBUG,
00236 "TAO (%P|%t) - Queued_Data::get_queued_data\n"
00237 "Using global pool for allocation \n"));
00238 }
00239
00240 ACE_NEW_RETURN (qd,
00241 TAO_Queued_Data,
00242 0);
00243
00244 return qd;
00245 }
00246
00247
00248 void
00249 TAO_Queued_Data::release (TAO_Queued_Data *qd)
00250 {
00251
00252 ACE_Message_Block::release (qd->msg_block_);
00253
00254 if (qd->allocator_)
00255 {
00256 ACE_DES_FREE (qd,
00257 qd->allocator_->free,
00258 TAO_Queued_Data);
00259
00260 return;
00261 }
00262
00263
00264 if (TAO_debug_level == 4)
00265 {
00266
00267 ACE_DEBUG ((LM_DEBUG,
00268 "TAO (%P|%t) - Queued_Data[%d]::release\n",
00269 "Using global pool for releasing \n"));
00270 }
00271 delete qd;
00272
00273 }
00274
00275
00276 TAO_Queued_Data *
00277 TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
00278 {
00279
00280
00281
00282
00283
00284
00285 ACE_Message_Block::Message_Flags fl =
00286 sqd.msg_block_->self_flags ();
00287
00288 if (ACE_BIT_ENABLED (fl,
00289 ACE_Message_Block::DONT_DELETE))
00290 (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_);
00291
00292
00293 TAO_Queued_Data *qd = 0;
00294
00295 if (sqd.allocator_)
00296 {
00297 ACE_NEW_MALLOC_RETURN (qd,
00298 static_cast<TAO_Queued_Data *> (
00299 sqd.allocator_->malloc (sizeof (TAO_Queued_Data))),
00300 TAO_Queued_Data (sqd),
00301 0);
00302
00303 return qd;
00304 }
00305
00306
00307
00308 if (TAO_debug_level == 4)
00309 {
00310
00311 ACE_DEBUG ((LM_DEBUG,
00312 "TAO (%P|%t) - Queued_Data[%d]::duplicate\n",
00313 "Using global pool for allocation \n"));
00314 }
00315
00316 ACE_NEW_RETURN (qd,
00317 TAO_Queued_Data (sqd),
00318 0);
00319
00320 return qd;
00321 }
00322
00323 int
00324 TAO_Queued_Data::consolidate (void)
00325 {
00326
00327 if (this->more_fragments_ && this->msg_block_->cont () != 0)
00328 {
00329
00330 ACE_Message_Block *dest = clone_mb_nocopy_size (
00331 this->msg_block_,
00332 this->msg_block_->total_length ());
00333
00334 if (0 == dest)
00335 {
00336
00337 return -1;
00338 }
00339
00340
00341
00342
00343
00344
00345 dest->cont (0);
00346
00347
00348 ACE_CDR::consolidate (dest, this->msg_block_);
00349
00350
00351 this->msg_block_->release ();
00352
00353
00354 this->msg_block_ = dest;
00355 this->more_fragments_ = 0;
00356 }
00357
00358 return 0;
00359 }
00360
00361 TAO_END_VERSIONED_NAMESPACE_DECL