00001 #include "tao/Asynch_Queued_Message.h"
00002 #include "tao/debug.h"
00003 #include "tao/ORB_Core.h"
00004
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_string.h"
00007 #include "ace/os_include/sys/os_uio.h"
00008 #include "ace/Log_Msg.h"
00009 #include "ace/Message_Block.h"
00010 #include "ace/Malloc_Base.h"
00011 #include "ace/High_Res_Timer.h"
00012
00013 ACE_RCSID (tao,
00014 Asynch_Queued_Message,
00015 "$Id: Asynch_Queued_Message.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
00020 const ACE_Message_Block *contents,
00021 TAO_ORB_Core *oc,
00022 ACE_Time_Value *timeout,
00023 ACE_Allocator *alloc,
00024 bool is_heap_allocated)
00025 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00026 , size_ (contents->total_length ())
00027 , offset_ (0)
00028 , abs_timeout_ (ACE_Time_Value::zero)
00029 {
00030 if (timeout != 0)
00031 this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
00032
00033 ACE_NEW (this->buffer_, char[this->size_]);
00034
00035 size_t copy_offset = 0;
00036 for (const ACE_Message_Block *i = contents;
00037 i != 0;
00038 i = i->cont ())
00039 {
00040 ACE_OS::memcpy (this->buffer_ + copy_offset,
00041 i->rd_ptr (),
00042 i->length ());
00043 copy_offset += i->length ();
00044 }
00045 }
00046
00047 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
00048 TAO_ORB_Core *oc,
00049 size_t size,
00050 const ACE_Time_Value &abs_timeout,
00051 ACE_Allocator *alloc,
00052 bool is_heap_allocated)
00053 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00054 , size_ (size)
00055 , offset_ (0)
00056 , buffer_ (buf)
00057 , abs_timeout_ (abs_timeout)
00058 {
00059 }
00060
00061 TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
00062 {
00063
00064 delete [] this->buffer_;
00065 }
00066
00067 size_t
00068 TAO_Asynch_Queued_Message::message_length (void) const
00069 {
00070 return this->size_ - this->offset_;
00071 }
00072
00073 int
00074 TAO_Asynch_Queued_Message::all_data_sent (void) const
00075 {
00076 return this->size_ == this->offset_;
00077 }
00078
00079 void
00080 TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max,
00081 int &iovcnt,
00082 iovec iov[]) const
00083 {
00084 ACE_ASSERT (iovcnt_max > iovcnt);
00085 ACE_UNUSED_ARG (iovcnt_max);
00086
00087 iov[iovcnt].iov_base = this->buffer_ + this->offset_;
00088 iov[iovcnt].iov_len = static_cast<u_long> (this->size_ - this->offset_);
00089 ++iovcnt;
00090 }
00091
00092 void
00093 TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
00094 {
00095 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
00096
00097 size_t const remaining_bytes = this->size_ - this->offset_;
00098 if (byte_count > remaining_bytes)
00099 {
00100 this->offset_ = this->size_;
00101 byte_count -= remaining_bytes;
00102 return;
00103 }
00104 this->offset_ += byte_count;
00105 byte_count = 0;
00106
00107 if (this->all_data_sent ())
00108 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
00109 this->orb_core_->leader_follower ());
00110 }
00111
00112
00113 TAO_Queued_Message *
00114 TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
00115 {
00116 char *buf = 0;
00117
00118
00119
00120
00121
00122
00123 size_t const sz = this->size_ - this->offset_;
00124
00125 ACE_NEW_RETURN (buf,
00126 char[sz],
00127 0);
00128
00129 ACE_OS::memcpy (buf,
00130 this->buffer_ + this->offset_,
00131 sz);
00132
00133 TAO_Asynch_Queued_Message *qm = 0;
00134
00135 if (alloc)
00136 {
00137 ACE_NEW_MALLOC_RETURN (qm,
00138 static_cast<TAO_Asynch_Queued_Message *> (
00139 alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
00140 TAO_Asynch_Queued_Message (buf,
00141 this->orb_core_,
00142 sz,
00143 this->abs_timeout_,
00144 alloc,
00145 true),
00146 0);
00147 }
00148 else
00149 {
00150
00151 if (TAO_debug_level == 4)
00152 {
00153
00154 ACE_DEBUG ((LM_DEBUG,
00155 "TAO (%P|%t) - Asynch_Queued_Message::clone\n"
00156 "Using global pool for allocation \n"));
00157 }
00158
00159 ACE_NEW_RETURN (qm,
00160 TAO_Asynch_Queued_Message (buf,
00161 this->orb_core_,
00162 sz,
00163 this->abs_timeout_,
00164 0,
00165 true),
00166 0);
00167 }
00168
00169 return qm;
00170 }
00171
00172 void
00173 TAO_Asynch_Queued_Message::destroy (void)
00174 {
00175 if (this->is_heap_created_)
00176 {
00177
00178
00179 if (this->allocator_)
00180 {
00181 ACE_DES_FREE (this,
00182 this->allocator_->free,
00183 TAO_Asynch_Queued_Message);
00184
00185 }
00186 else
00187 {
00188 delete this;
00189 }
00190 }
00191 }
00192
00193 bool
00194 TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
00195 {
00196 if (this->abs_timeout_ > ACE_Time_Value::zero)
00197 {
00198 if (this->offset_ > 0)
00199 {
00200 return false;
00201 }
00202 return this->abs_timeout_ < now;
00203 }
00204 return false;
00205 }
00206
00207 TAO_END_VERSIONED_NAMESPACE_DECL