00001 #include "tao/Asynch_Queued_Message.h"
00002 #include "tao/debug.h"
00003 #include "tao/ORB_Core.h"
00004
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_string.h"
00007 #include "ace/os_include/sys/os_uio.h"
00008 #include "ace/Log_Msg.h"
00009 #include "ace/Message_Block.h"
00010 #include "ace/Malloc_Base.h"
00011 #include "ace/High_Res_Timer.h"
00012
00013 ACE_RCSID (tao,
00014 Asynch_Queued_Message,
00015 "$Id: Asynch_Queued_Message.cpp 85406 2009-05-20 09:07:56Z johnnyw $")
00016
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
00020 const ACE_Message_Block *contents,
00021 TAO_ORB_Core *oc,
00022 ACE_Time_Value *timeout,
00023 ACE_Allocator *alloc,
00024 bool is_heap_allocated)
00025 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00026 , size_ (contents->total_length ())
00027 , offset_ (0)
00028 , abs_timeout_ (ACE_Time_Value::zero)
00029 {
00030 if (timeout != 0)
00031 {
00032 this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
00033 }
00034
00035 ACE_NEW (this->buffer_, char[this->size_]);
00036
00037 size_t copy_offset = 0;
00038 for (const ACE_Message_Block *i = contents;
00039 i != 0;
00040 i = i->cont ())
00041 {
00042 ACE_OS::memcpy (this->buffer_ + copy_offset,
00043 i->rd_ptr (),
00044 i->length ());
00045 copy_offset += i->length ();
00046 }
00047 }
00048
00049 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
00050 TAO_ORB_Core *oc,
00051 size_t size,
00052 const ACE_Time_Value &abs_timeout,
00053 ACE_Allocator *alloc,
00054 bool is_heap_allocated)
00055 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00056 , size_ (size)
00057 , offset_ (0)
00058 , buffer_ (buf)
00059 , abs_timeout_ (abs_timeout)
00060 {
00061 }
00062
00063 TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
00064 {
00065
00066 delete [] this->buffer_;
00067 }
00068
00069 size_t
00070 TAO_Asynch_Queued_Message::message_length (void) const
00071 {
00072 return this->size_ - this->offset_;
00073 }
00074
00075 int
00076 TAO_Asynch_Queued_Message::all_data_sent (void) const
00077 {
00078 return this->size_ == this->offset_;
00079 }
00080
00081 void
00082 TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max,
00083 int &iovcnt,
00084 iovec iov[]) const
00085 {
00086 ACE_ASSERT (iovcnt_max > iovcnt);
00087 ACE_UNUSED_ARG (iovcnt_max);
00088
00089 iov[iovcnt].iov_base = this->buffer_ + this->offset_;
00090 iov[iovcnt].iov_len = static_cast<u_long> (this->size_ - this->offset_);
00091 ++iovcnt;
00092 }
00093
00094 void
00095 TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
00096 {
00097 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
00098
00099 size_t const remaining_bytes = this->size_ - this->offset_;
00100 if (byte_count > remaining_bytes)
00101 {
00102 this->offset_ = this->size_;
00103 byte_count -= remaining_bytes;
00104 return;
00105 }
00106 this->offset_ += byte_count;
00107 byte_count = 0;
00108
00109 if (this->all_data_sent ())
00110 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
00111 this->orb_core_->leader_follower ());
00112 }
00113
00114
00115 TAO_Queued_Message *
00116 TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
00117 {
00118 char *buf = 0;
00119
00120
00121
00122
00123
00124
00125 size_t const sz = this->size_ - this->offset_;
00126
00127 ACE_NEW_RETURN (buf,
00128 char[sz],
00129 0);
00130
00131 ACE_OS::memcpy (buf,
00132 this->buffer_ + this->offset_,
00133 sz);
00134
00135 TAO_Asynch_Queued_Message *qm = 0;
00136
00137 if (alloc)
00138 {
00139 ACE_NEW_MALLOC_RETURN (qm,
00140 static_cast<TAO_Asynch_Queued_Message *> (
00141 alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
00142 TAO_Asynch_Queued_Message (buf,
00143 this->orb_core_,
00144 sz,
00145 this->abs_timeout_,
00146 alloc,
00147 true),
00148 0);
00149 }
00150 else
00151 {
00152
00153 if (TAO_debug_level == 4)
00154 {
00155
00156 ACE_DEBUG ((LM_DEBUG,
00157 "TAO (%P|%t) - Asynch_Queued_Message::clone\n"
00158 "Using global pool for allocation\n"));
00159 }
00160
00161 ACE_NEW_RETURN (qm,
00162 TAO_Asynch_Queued_Message (buf,
00163 this->orb_core_,
00164 sz,
00165 this->abs_timeout_,
00166 0,
00167 true),
00168 0);
00169 }
00170
00171 return qm;
00172 }
00173
00174 void
00175 TAO_Asynch_Queued_Message::destroy (void)
00176 {
00177 if (this->is_heap_created_)
00178 {
00179
00180
00181 if (this->allocator_)
00182 {
00183 ACE_DES_FREE (this,
00184 this->allocator_->free,
00185 TAO_Asynch_Queued_Message);
00186
00187 }
00188 else
00189 {
00190 delete this;
00191 }
00192 }
00193 }
00194
00195 bool
00196 TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
00197 {
00198 if (this->abs_timeout_ > ACE_Time_Value::zero)
00199 {
00200 if (this->offset_ > 0)
00201 {
00202 return false;
00203 }
00204 return this->abs_timeout_ < now;
00205 }
00206 return false;
00207 }
00208
00209 void
00210 TAO_Asynch_Queued_Message::copy_if_necessary (const ACE_Message_Block*)
00211 {
00212
00213 }
00214
00215 TAO_END_VERSIONED_NAMESPACE_DECL