00001
00002
00003 #include "tao/Synch_Queued_Message.h"
00004 #include "tao/debug.h"
00005 #include "tao/ORB_Core.h"
00006
00007 #include "ace/Malloc_T.h"
00008 #include "ace/Message_Block.h"
00009
00010 ACE_RCSID (tao,
00011 Synch_Queued_Message,
00012 "$Id: Synch_Queued_Message.cpp 79145 2007-08-01 07:38:40Z johnnyw $")
00013
00014 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 TAO_Synch_Queued_Message::TAO_Synch_Queued_Message (
00017 const ACE_Message_Block *contents,
00018 TAO_ORB_Core *oc,
00019 ACE_Allocator *alloc,
00020 bool is_heap_allocated)
00021 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00022 , contents_ (const_cast<ACE_Message_Block*> (contents))
00023 , current_block_ (contents_)
00024 {
00025 }
00026
00027 TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void)
00028 {
00029
00030 }
00031
00032 const ACE_Message_Block *
00033 TAO_Synch_Queued_Message::current_block (void) const
00034 {
00035 return this->current_block_;
00036 }
00037
00038 size_t
00039 TAO_Synch_Queued_Message::message_length (void) const
00040 {
00041 if (this->current_block_ == 0)
00042 {
00043 return 0;
00044 }
00045
00046 return this->current_block_->total_length ();
00047 }
00048
00049 int
00050 TAO_Synch_Queued_Message::all_data_sent (void) const
00051 {
00052 return this->current_block_ == 0;
00053 }
00054
00055 void
00056 TAO_Synch_Queued_Message::fill_iov (int iovcnt_max,
00057 int &iovcnt,
00058 iovec iov[]) const
00059 {
00060 ACE_ASSERT (iovcnt_max > iovcnt);
00061
00062 for (const ACE_Message_Block *message_block = this->current_block_;
00063 message_block != 0 && iovcnt < iovcnt_max;
00064 message_block = message_block->cont ())
00065 {
00066 size_t const message_block_length = message_block->length ();
00067
00068
00069 if (message_block_length > 0)
00070 {
00071
00072 iov[iovcnt].iov_base = message_block->rd_ptr ();
00073 iov[iovcnt].iov_len = static_cast<u_long> (message_block_length);
00074
00075
00076 ++iovcnt;
00077 }
00078 }
00079 }
00080
00081 void
00082 TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
00083 {
00084 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
00085
00086 while (this->current_block_ != 0 && byte_count > 0)
00087 {
00088 size_t const l = this->current_block_->length ();
00089
00090 if (byte_count < l)
00091 {
00092 this->current_block_->rd_ptr (byte_count);
00093 byte_count = 0;
00094 return;
00095 }
00096
00097 byte_count -= l;
00098 this->current_block_->rd_ptr (l);
00099 this->current_block_ = this->current_block_->cont ();
00100
00101 while (this->current_block_ != 0
00102 && this->current_block_->length () == 0)
00103 {
00104 this->current_block_ = this->current_block_->cont ();
00105 }
00106 }
00107
00108 if (this->current_block_ == 0)
00109 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
00110 this->orb_core_->leader_follower ());
00111 }
00112
00113 TAO_Queued_Message *
00114 TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
00115 {
00116 TAO_Synch_Queued_Message *qm = 0;
00117
00118
00119
00120
00121
00122
00123 ACE_Message_Block *mb = this->current_block_->clone ();
00124
00125 if (alloc)
00126 {
00127 ACE_NEW_MALLOC_RETURN (qm,
00128 static_cast<TAO_Synch_Queued_Message *> (
00129 alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
00130 TAO_Synch_Queued_Message (mb,
00131 this->orb_core_,
00132 alloc,
00133 true),
00134 0);
00135 }
00136 else
00137 {
00138 ACE_NEW_RETURN (qm,
00139 TAO_Synch_Queued_Message (mb, this->orb_core_, 0, true),
00140 0);
00141 }
00142
00143 return qm;
00144 }
00145
00146 void
00147 TAO_Synch_Queued_Message::destroy (void)
00148 {
00149 if (this->is_heap_created_)
00150 {
00151 ACE_Message_Block::release (this->contents_);
00152 this->current_block_ = 0;
00153
00154
00155
00156 if (this->allocator_)
00157 {
00158 ACE_DES_FREE (this,
00159 this->allocator_->free,
00160 TAO_Synch_Queued_Message);
00161
00162 }
00163 else
00164 {
00165 delete this;
00166 }
00167 }
00168 }
00169
00170 TAO_END_VERSIONED_NAMESPACE_DECL