Asynch_Queued_Message.cpp

Go to the documentation of this file.
00001 #include "tao/Asynch_Queued_Message.h"
00002 #include "tao/debug.h"
00003 #include "tao/ORB_Core.h"
00004 
00005 #include "ace/OS_Memory.h"
00006 #include "ace/OS_NS_string.h"
00007 #include "ace/os_include/sys/os_uio.h"
00008 #include "ace/Log_Msg.h"
00009 #include "ace/Message_Block.h"
00010 #include "ace/Malloc_Base.h"
00011 #include "ace/High_Res_Timer.h"
00012 
00013 ACE_RCSID (tao,
00014            Asynch_Queued_Message,
00015            "$Id: Asynch_Queued_Message.cpp 79169 2007-08-02 08:41:23Z johnnyw $")
00016 
00017 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
00020   const ACE_Message_Block *contents,
00021   TAO_ORB_Core *oc,
00022   ACE_Time_Value *timeout,
00023   ACE_Allocator *alloc,
00024   bool is_heap_allocated)
00025   : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00026   , size_ (contents->total_length ())
00027   , offset_ (0)
00028   , abs_timeout_ (ACE_Time_Value::zero)
00029 {
00030   if (timeout != 0)// && *timeout != ACE_Time_Value::zero)
00031     this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
00032   // @@ Use a pool for these guys!!
00033   ACE_NEW (this->buffer_, char[this->size_]);
00034 
00035   size_t copy_offset = 0;
00036   for (const ACE_Message_Block *i = contents;
00037        i != 0;
00038        i = i->cont ())
00039     {
00040       ACE_OS::memcpy (this->buffer_ + copy_offset,
00041                       i->rd_ptr (),
00042                       i->length ());
00043       copy_offset += i->length ();
00044     }
00045 }
00046 
00047 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
00048                                                       TAO_ORB_Core *oc,
00049                                                       size_t size,
00050                                                       const ACE_Time_Value &abs_timeout,
00051                                                       ACE_Allocator *alloc,
00052                                                       bool is_heap_allocated)
00053   : TAO_Queued_Message (oc, alloc, is_heap_allocated)
00054   , size_ (size)
00055   , offset_ (0)
00056   , buffer_ (buf)
00057   , abs_timeout_ (abs_timeout)
00058 {
00059 }
00060 
00061 TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
00062 {
00063   // @@ Use a pool for these guys!
00064   delete [] this->buffer_;
00065 }
00066 
00067 size_t
00068 TAO_Asynch_Queued_Message::message_length (void) const
00069 {
00070   return this->size_ - this->offset_;
00071 }
00072 
00073 int
00074 TAO_Asynch_Queued_Message::all_data_sent (void) const
00075 {
00076   return this->size_ == this->offset_;
00077 }
00078 
00079 void
00080 TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max,
00081                                      int &iovcnt,
00082                                      iovec iov[]) const
00083 {
00084   ACE_ASSERT (iovcnt_max > iovcnt);
00085   ACE_UNUSED_ARG (iovcnt_max); // not used if ACE_ASSERT() is empty
00086 
00087   iov[iovcnt].iov_base = this->buffer_ + this->offset_;
00088   iov[iovcnt].iov_len  = static_cast<u_long> (this->size_ - this->offset_);
00089   ++iovcnt;
00090 }
00091 
00092 void
00093 TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
00094 {
00095   this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
00096 
00097   size_t const remaining_bytes = this->size_ - this->offset_;
00098   if (byte_count > remaining_bytes)
00099     {
00100       this->offset_ = this->size_;
00101       byte_count -= remaining_bytes;
00102       return;
00103     }
00104   this->offset_ += byte_count;
00105   byte_count = 0;
00106 
00107   if (this->all_data_sent ())
00108     this->state_changed (TAO_LF_Event::LFS_SUCCESS,
00109                          this->orb_core_->leader_follower ());
00110 }
00111 
00112 
00113 TAO_Queued_Message *
00114 TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
00115 {
00116   char *buf = 0;
00117 
00118   // @todo: Need to use a memory pool. But certain things need to
00119   // change a bit in this class for that. Till then.
00120 
00121   // Just allocate and copy data that needs to be sent, no point
00122   // copying the whole buffer.
00123   size_t const sz = this->size_ - this->offset_;
00124 
00125   ACE_NEW_RETURN (buf,
00126                   char[sz],
00127                   0);
00128 
00129   ACE_OS::memcpy (buf,
00130                   this->buffer_ + this->offset_,
00131                   sz);
00132 
00133   TAO_Asynch_Queued_Message *qm = 0;
00134 
00135   if (alloc)
00136     {
00137       ACE_NEW_MALLOC_RETURN (qm,
00138                              static_cast<TAO_Asynch_Queued_Message *> (
00139                                  alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
00140                              TAO_Asynch_Queued_Message (buf,
00141                                                         this->orb_core_,
00142                                                         sz,
00143                                                         this->abs_timeout_,
00144                                                         alloc,
00145                                                         true),
00146                              0);
00147     }
00148   else
00149     {
00150       // No allocator, so use the common heap!
00151       if (TAO_debug_level == 4)
00152         {
00153           // This debug is for testing purposes!
00154           ACE_DEBUG ((LM_DEBUG,
00155                       "TAO (%P|%t) - Asynch_Queued_Message::clone\n"
00156                       "Using global pool for allocation \n"));
00157         }
00158 
00159       ACE_NEW_RETURN (qm,
00160                       TAO_Asynch_Queued_Message (buf,
00161                                                  this->orb_core_,
00162                                                  sz,
00163                                                  this->abs_timeout_,
00164                                                  0,
00165                                                  true),
00166                       0);
00167     }
00168 
00169   return qm;
00170 }
00171 
00172 void
00173 TAO_Asynch_Queued_Message::destroy (void)
00174 {
00175   if (this->is_heap_created_)
00176     {
00177       // If we have an allocator release the memory to the allocator
00178       // pool.
00179       if (this->allocator_)
00180         {
00181           ACE_DES_FREE (this,
00182                         this->allocator_->free,
00183                         TAO_Asynch_Queued_Message);
00184 
00185         }
00186       else // global release..
00187         {
00188           delete this;
00189         }
00190     }
00191 }
00192 
00193 bool
00194 TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
00195 {
00196   if (this->abs_timeout_ > ACE_Time_Value::zero)
00197     {
00198       if (this->offset_ > 0)
00199         {
00200           return false; //never expire partial messages
00201         }
00202       return this->abs_timeout_ < now;
00203     }
00204   return false;
00205 }
00206 
00207 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:37:51 2010 for TAO by  doxygen 1.4.7