00001
00002
00003 #include "ace/UPIPE_Stream.h"
00004
00005 ACE_RCSID(ace, UPIPE_Stream, "$Id: UPIPE_Stream.cpp 77242 2007-02-20 15:09:37Z elliott_c $")
00006
00007 #if defined (ACE_HAS_THREADS)
00008
00009 #include "ace/OS_NS_string.h"
00010
00011 #if !defined (__ACE_INLINE__)
00012 #include "ace/UPIPE_Stream.inl"
00013 #endif
00014
00015 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
00018
00019 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
00020 : mb_last_ (0),
00021 reference_count_ (0)
00022 {
00023 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
00024 }
00025
00026 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
00027 {
00028 if (this->mb_last_ != 0)
00029 {
00030 this->mb_last_->release ();
00031 this->mb_last_ = 0;
00032 }
00033 }
00034
00035 int
00036 ACE_UPIPE_Stream::control (int cmd,
00037 void * val) const
00038 {
00039 ACE_TRACE ("ACE_UPIPE_Stream::control");
00040
00041 return ((ACE_UPIPE_Stream *) this)->stream_.control
00042 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
00043 }
00044
00045 void
00046 ACE_UPIPE_Stream::dump (void) const
00047 {
00048 #if defined (ACE_HAS_DUMP)
00049 ACE_TRACE ("ACE_UPIPE_Stream::dump");
00050 #endif
00051 }
00052
00053 int
00054 ACE_UPIPE_Stream::close (void)
00055 {
00056 ACE_TRACE ("ACE_UPIPE_Stream::close");
00057 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
00058
00059 this->reference_count_--;
00060
00061 if (this->reference_count_ == 0)
00062 {
00063
00064
00065
00066 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
00067 this->ACE_SPIPE::close ();
00068
00069
00070 return this->stream_.close ();
00071 }
00072 return 0;
00073 }
00074
00075 int
00076 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
00077 {
00078 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
00079 remote_sap = this->remote_addr_;
00080 return 0;
00081 }
00082
00083 int
00084 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p,
00085 ACE_Time_Value *timeout)
00086 {
00087 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
00088 return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
00089 }
00090
00091 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p,
00092 ACE_Time_Value *timeout)
00093 {
00094 return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
00095 }
00096
00097
00098
00099 ssize_t
00100 ACE_UPIPE_Stream::send (const char *buffer,
00101 size_t n,
00102 ACE_Time_Value *timeout)
00103 {
00104 ACE_TRACE ("ACE_UPIPE_Stream::send");
00105
00106 ACE_Message_Block *mb_p;
00107 ACE_NEW_RETURN (mb_p,
00108 ACE_Message_Block (n),
00109 -1);
00110 mb_p->copy (buffer, n);
00111 return
00112 this->stream_.put (mb_p, timeout) == -1
00113 ? -1
00114 : static_cast<ssize_t> (n);
00115 }
00116
00117
00118
00119 ssize_t
00120 ACE_UPIPE_Stream::recv (char *buffer,
00121 size_t n,
00122 ACE_Time_Value *timeout)
00123 {
00124 ACE_TRACE ("ACE_UPIPE_Stream::recv");
00125
00126 size_t bytes_read = 0;
00127
00128 while (bytes_read < n)
00129 if (this->mb_last_ != 0)
00130 {
00131
00132 size_t this_len = this->mb_last_->length ();
00133 if (this_len < n)
00134 {
00135
00136
00137 ACE_OS::memcpy ((void *) &buffer[bytes_read],
00138 this->mb_last_->rd_ptr (),
00139 this_len);
00140 bytes_read += this_len;
00141 this->mb_last_ = this->mb_last_->release ();
00142 return static_cast<ssize_t> (bytes_read);
00143 }
00144 else
00145 {
00146
00147
00148 ACE_OS::memcpy (&buffer[bytes_read],
00149 this->mb_last_->rd_ptr (),
00150 n);
00151 bytes_read += n;
00152
00153
00154 this->mb_last_->rd_ptr (n);
00155
00156 if (this->mb_last_->length () == 0)
00157
00158 this->mb_last_ = this->mb_last_->release ();
00159 }
00160 }
00161 else
00162 {
00163
00164 int result = this->stream_.get (this->mb_last_, timeout);
00165
00166 if (result == -1)
00167 {
00168 if (errno == EWOULDBLOCK && bytes_read > 0)
00169
00170 return static_cast<ssize_t> (bytes_read);
00171 else
00172 return -1;
00173 }
00174 }
00175
00176 return static_cast<ssize_t> (bytes_read);
00177 }
00178
00179 ssize_t
00180 ACE_UPIPE_Stream::send_n (const char *buf,
00181 size_t n,
00182 ACE_Time_Value *timeout)
00183 {
00184 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
00185
00186 size_t bytes_written;
00187 ssize_t len = 0;
00188
00189 for (bytes_written = 0;
00190 bytes_written < n;
00191 bytes_written += len)
00192 {
00193 len = this->send (buf + bytes_written,
00194 n - bytes_written,
00195 timeout);
00196
00197 if (len == -1)
00198 return -1;
00199 }
00200
00201 return bytes_written;
00202 }
00203
00204 ssize_t
00205 ACE_UPIPE_Stream::recv_n (char *buf,
00206 size_t n,
00207 ACE_Time_Value *timeout)
00208 {
00209 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
00210 size_t bytes_read;
00211 ssize_t len = 0;
00212
00213 for (bytes_read = 0;
00214 bytes_read < n;
00215 bytes_read += len)
00216 {
00217 len = this->recv (buf + bytes_read,
00218 n - bytes_read,
00219 timeout);
00220 if (len == -1)
00221 return -1;
00222 else if (len == 0)
00223 break;
00224 }
00225
00226 return bytes_read;
00227 }
00228
00229 ACE_END_VERSIONED_NAMESPACE_DECL
00230
00231 #endif