Go to the documentation of this file.00001
00002
00003 #include "tao/Synch_Queued_Message.h"
00004 #include "tao/debug.h"
00005 #include "tao/ORB_Core.h"
00006
00007 #include "ace/Malloc_T.h"
00008 #include "ace/Message_Block.h"
00009
00010 ACE_RCSID (tao,
00011 Synch_Queued_Message,
00012 "$Id: Synch_Queued_Message.cpp 81839 2008-06-05 11:58:32Z elliott_c $")
00013
00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 TAO_Synch_Queued_Message::TAO_Synch_Queued_Message (
00017 const ACE_Message_Block *contents,
00018 TAO_ORB_Core *oc,
00019 ACE_Allocator *alloc,
00020 bool is_heap_allocated)
00021 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00022 , contents_ (const_cast<ACE_Message_Block*> (contents))
00023 , current_block_ (contents_)
00024 , own_contents_ (is_heap_allocated)
00025 {
00026 }
00027
00028 TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void)
00029 {
00030
00031 }
00032
00033 const ACE_Message_Block *
00034 TAO_Synch_Queued_Message::current_block (void) const
00035 {
00036 return this->current_block_;
00037 }
00038
00039 size_t
00040 TAO_Synch_Queued_Message::message_length (void) const
00041 {
00042 if (this->current_block_ == 0)
00043 {
00044 return 0;
00045 }
00046
00047 return this->current_block_->total_length ();
00048 }
00049
00050 int
00051 TAO_Synch_Queued_Message::all_data_sent (void) const
00052 {
00053 return this->current_block_ == 0;
00054 }
00055
00056 void
00057 TAO_Synch_Queued_Message::fill_iov (int iovcnt_max,
00058 int &iovcnt,
00059 iovec iov[]) const
00060 {
00061 ACE_ASSERT (iovcnt_max > iovcnt);
00062
00063 for (const ACE_Message_Block *message_block = this->current_block_;
00064 message_block != 0 && iovcnt < iovcnt_max;
00065 message_block = message_block->cont ())
00066 {
00067 size_t const message_block_length = message_block->length ();
00068
00069
00070 if (message_block_length > 0)
00071 {
00072
00073 iov[iovcnt].iov_base = message_block->rd_ptr ();
00074 iov[iovcnt].iov_len = static_cast<u_long> (message_block_length);
00075
00076
00077 ++iovcnt;
00078 }
00079 }
00080 }
00081
00082 void
00083 TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
00084 {
00085 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
00086
00087 while (this->current_block_ != 0 && byte_count > 0)
00088 {
00089 size_t const l = this->current_block_->length ();
00090
00091 if (byte_count < l)
00092 {
00093 this->current_block_->rd_ptr (byte_count);
00094 byte_count = 0;
00095 return;
00096 }
00097
00098 byte_count -= l;
00099 this->current_block_->rd_ptr (l);
00100 this->current_block_ = this->current_block_->cont ();
00101
00102 while (this->current_block_ != 0
00103 && this->current_block_->length () == 0)
00104 {
00105 this->current_block_ = this->current_block_->cont ();
00106 }
00107 }
00108
00109 if (this->current_block_ == 0)
00110 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
00111 this->orb_core_->leader_follower ());
00112 }
00113
00114 TAO_Queued_Message *
00115 TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
00116 {
00117 TAO_Synch_Queued_Message *qm = 0;
00118
00119
00120
00121
00122
00123
00124 ACE_Message_Block *mb = this->current_block_->clone ();
00125
00126 if (alloc)
00127 {
00128 ACE_NEW_MALLOC_RETURN (qm,
00129 static_cast<TAO_Synch_Queued_Message *> (
00130 alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
00131 TAO_Synch_Queued_Message (mb,
00132 this->orb_core_,
00133 alloc,
00134 true),
00135 0);
00136 }
00137 else
00138 {
00139 ACE_NEW_RETURN (qm,
00140 TAO_Synch_Queued_Message (mb, this->orb_core_, 0, true),
00141 0);
00142 }
00143
00144 return qm;
00145 }
00146
00147 void
00148 TAO_Synch_Queued_Message::destroy (void)
00149 {
00150 if (this->own_contents_)
00151 {
00152 ACE_Message_Block::release (this->contents_);
00153 this->current_block_ = 0;
00154 this->contents_ = 0;
00155 }
00156
00157 if (this->is_heap_created_)
00158 {
00159
00160
00161 if (this->allocator_)
00162 {
00163 ACE_DES_FREE (this,
00164 this->allocator_->free,
00165 TAO_Synch_Queued_Message);
00166
00167 }
00168 else
00169 {
00170 delete this;
00171 }
00172 }
00173 }
00174
00175 void
00176 TAO_Synch_Queued_Message::copy_if_necessary (const ACE_Message_Block* chain)
00177 {
00178 if (!this->own_contents_)
00179 {
00180
00181
00182 for (const ACE_Message_Block* mb = chain; mb != 0; mb = mb->cont ())
00183 {
00184 if (mb == this->current_block_)
00185 {
00186
00187
00188
00189
00190
00191 this->own_contents_ = true;
00192 this->contents_ = this->current_block_->clone ();
00193 this->current_block_ = this->contents_;
00194 break;
00195 }
00196 }
00197 }
00198 }
00199
00200 TAO_END_VERSIONED_NAMESPACE_DECL