UPIPE_Stream.cpp

Go to the documentation of this file.
00001 // UPIPE_Stream.cpp,v 4.21 2005/10/28 23:55:10 ossama Exp
00002 
00003 #include "ace/UPIPE_Stream.h"
00004 
00005 ACE_RCSID(ace, UPIPE_Stream, "UPIPE_Stream.cpp,v 4.21 2005/10/28 23:55:10 ossama Exp")
00006 
00007 #if defined (ACE_HAS_THREADS)
00008 
00009 #include "ace/OS_NS_string.h"
00010 
00011 #if !defined (__ACE_INLINE__)
00012 #include "ace/UPIPE_Stream.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
00018 
00019 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
00020   : mb_last_ (0),
00021     reference_count_ (0)
00022 {
00023   ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
00024 }
00025 
00026 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
00027 {
00028   if (this->mb_last_ != 0)
00029     {
00030       this->mb_last_->release ();
00031       this->mb_last_ = 0;
00032     }
00033 }
00034 
00035 int
00036 ACE_UPIPE_Stream::control (int cmd,
00037                            void * val) const
00038 {
00039   ACE_TRACE ("ACE_UPIPE_Stream::control");
00040 
00041   return ((ACE_UPIPE_Stream *) this)->stream_.control
00042     ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
00043 }
00044 
00045 void
00046 ACE_UPIPE_Stream::dump (void) const
00047 {
00048 #if defined (ACE_HAS_DUMP)
00049   ACE_TRACE ("ACE_UPIPE_Stream::dump");
00050 #endif /* ACE_HAS_DUMP */
00051 }
00052 
00053 int
00054 ACE_UPIPE_Stream::close (void)
00055 {
00056   ACE_TRACE ("ACE_UPIPE_Stream::close");
00057   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
00058 
00059   this->reference_count_--;
00060 
00061   if (this->reference_count_ == 0)
00062     {
00063       // Since the UPIPE should have been closed earlier we won't bother
00064       // checking to see if closing it now fails.
00065 
00066       if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
00067         this->ACE_SPIPE::close ();
00068 
00069       // Close down the ACE_stream.
00070       return this->stream_.close ();
00071     }
00072   return 0;
00073 }
00074 
00075 int
00076 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
00077 {
00078   ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
00079   remote_sap = this->remote_addr_;
00080   return 0;
00081 }
00082 
00083 int
00084 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p,
00085                         ACE_Time_Value *timeout)
00086 {
00087   ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
00088   return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
00089 }
00090 
00091 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p,
00092                             ACE_Time_Value *timeout)
00093 {
00094   return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
00095 }
00096 
00097 // Send a buffer.
00098 
00099 ssize_t
00100 ACE_UPIPE_Stream::send (const char *buffer,
00101                         size_t n,
00102                         ACE_Time_Value *timeout)
00103 {
00104   ACE_TRACE ("ACE_UPIPE_Stream::send");
00105 
00106   ACE_Message_Block *mb_p;
00107   ACE_NEW_RETURN (mb_p,
00108                   ACE_Message_Block (n),
00109                   -1);
00110   mb_p->copy (buffer, n);
00111   return
00112     this->stream_.put (mb_p, timeout) == -1
00113     ? -1
00114     : static_cast<ssize_t> (n);
00115 }
00116 
00117 // Receive a buffer.
00118 
00119 ssize_t
00120 ACE_UPIPE_Stream::recv (char *buffer,
00121                         size_t n,
00122                         ACE_Time_Value *timeout)
00123 {
00124   ACE_TRACE ("ACE_UPIPE_Stream::recv");
00125   // Index in buffer.
00126   size_t bytes_read = 0;
00127 
00128   while (bytes_read < n)
00129     if (this->mb_last_ != 0)
00130       {
00131         // We have remaining data in our last read Message_Buffer.
00132         size_t this_len = this->mb_last_->length ();
00133         if (this_len < n)
00134           {
00135             // The remaining data is not enough.
00136 
00137             ACE_OS::memcpy ((void *) &buffer[bytes_read],
00138                             this->mb_last_->rd_ptr (),
00139                             this_len);
00140             bytes_read += this_len;
00141             this->mb_last_ = this->mb_last_->release ();   // mb_last_ now 0
00142             return bytes_read;
00143           }
00144         else
00145           {
00146             // The remaining data is at least enough.  If there's
00147             // more, we'll get it the next time through.
00148             ACE_OS::memcpy (&buffer[bytes_read],
00149                             this->mb_last_->rd_ptr (),
00150                             n);
00151             bytes_read += n;
00152 
00153             // Advance rd_ptr.
00154             this->mb_last_->rd_ptr (n);
00155 
00156             if (this->mb_last_->length () == 0)
00157               // Now the Message_Buffer is empty.
00158               this->mb_last_ = this->mb_last_->release ();
00159           }
00160       }
00161     else
00162       {
00163         // We have to get a new Message_Buffer from our stream.
00164         int result = this->stream_.get (this->mb_last_, timeout);
00165 
00166         if (result == -1)
00167           {
00168             if (errno == EWOULDBLOCK && bytes_read > 0)
00169               // Return the number of bytes read before we timed out.
00170               return bytes_read;
00171             else
00172               return -1;
00173           }
00174       }
00175 
00176   return bytes_read;
00177 }
00178 
00179 ssize_t
00180 ACE_UPIPE_Stream::send_n (const char *buf,
00181                           size_t n,
00182                           ACE_Time_Value *timeout)
00183 {
00184   ACE_TRACE ("ACE_UPIPE_Stream::send_n");
00185 
00186   size_t bytes_written;
00187   ssize_t len = 0;
00188 
00189   for (bytes_written = 0;
00190        bytes_written < n;
00191        bytes_written += len)
00192     {
00193       len = this->send (buf + bytes_written,
00194                         n - bytes_written,
00195                         timeout);
00196 
00197       if (len == -1)
00198         return -1;
00199     }
00200 
00201   return bytes_written;
00202 }
00203 
00204 ssize_t
00205 ACE_UPIPE_Stream::recv_n (char *buf,
00206                           size_t n,
00207                           ACE_Time_Value *timeout)
00208 {
00209   ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
00210   size_t bytes_read;
00211   ssize_t len = 0;
00212 
00213   for (bytes_read = 0;
00214        bytes_read < n;
00215        bytes_read += len)
00216     {
00217       len = this->recv (buf + bytes_read,
00218                         n - bytes_read,
00219                         timeout);
00220       if (len == -1)
00221         return -1;
00222       else if (len == 0)
00223         break;
00224     }
00225 
00226   return bytes_read;
00227 }
00228 
00229 ACE_END_VERSIONED_NAMESPACE_DECL
00230 
00231 #endif /* ACE_HAS_THREADS */

Generated on Thu Nov 9 09:42:09 2006 for ACE by doxygen 1.3.6