00001
00002
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009 #include "ace/Truncate.h"
00010
00011 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00012 # include "ace/OS_NS_unistd.h"
00013 #endif // ACE_HAS_STREAM_PIPES || __QNX__
00014
00015 #include "ace/os_include/netinet/os_tcp.h"
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Pipe.inl"
00019 #endif
00020
00021 ACE_RCSID(ace, Pipe, "$Id: Pipe.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00022
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 void
00026 ACE_Pipe::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029 ACE_TRACE ("ACE_Pipe::dump");
00030 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00031 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handles_[0] = %d"), this->handles_[0]));
00032 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nhandles_[1] = %d"), this->handles_[1]));
00033 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\n")));
00034 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00035 #endif
00036 }
00037
00038 int
00039 ACE_Pipe::open (int buffer_size)
00040 {
00041 ACE_TRACE ("ACE_Pipe::open");
00042
00043 #if defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
00044 ACE_INET_Addr my_addr;
00045 ACE_SOCK_Acceptor acceptor;
00046 ACE_SOCK_Connector connector;
00047 ACE_SOCK_Stream reader;
00048 ACE_SOCK_Stream writer;
00049 int result = 0;
00050 # if defined (ACE_WIN32)
00051 ACE_INET_Addr local_any (static_cast<u_short> (0), ACE_LOCALHOST);
00052 # else
00053 ACE_Addr local_any = ACE_Addr::sap_any;
00054 # endif
00055
00056
00057 if (acceptor.open (local_any) == -1
00058 || acceptor.get_local_addr (my_addr) == -1)
00059 result = -1;
00060 else
00061 {
00062 ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00063 ACE_LOCALHOST);
00064
00065
00066 if (connector.connect (writer, sv_addr) == -1)
00067 result = -1;
00068 else if (acceptor.accept (reader) == -1)
00069 {
00070 writer.close ();
00071 result = -1;
00072 }
00073 }
00074
00075
00076 acceptor.close ();
00077 if (result == -1)
00078 return -1;
00079
00080 this->handles_[0] = reader.get_handle ();
00081 this->handles_[1] = writer.get_handle ();
00082
00083 # if !defined (ACE_LACKS_TCP_NODELAY)
00084 int one = 1;
00085
00086
00087
00088
00089
00090 if (writer.set_option (ACE_IPPROTO_TCP,
00091 TCP_NODELAY,
00092 &one,
00093 sizeof one) == -1)
00094 {
00095 this->close ();
00096 return -1;
00097 }
00098 # endif
00099
00100 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00101 ACE_UNUSED_ARG (buffer_size);
00102 # else
00103 if (reader.set_option (SOL_SOCKET,
00104 SO_RCVBUF,
00105 reinterpret_cast <void *> (&buffer_size),
00106 sizeof (buffer_size)) == -1
00107 && errno != ENOTSUP)
00108 {
00109 this->close ();
00110 return -1;
00111 }
00112 else if (writer.set_option (SOL_SOCKET,
00113 SO_SNDBUF,
00114 reinterpret_cast <void *> (&buffer_size),
00115 sizeof (buffer_size)) == -1
00116 && errno != ENOTSUP)
00117 {
00118 this->close ();
00119 return -1;
00120 }
00121 # endif
00122
00123 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00124 ACE_UNUSED_ARG (buffer_size);
00125 if (ACE_OS::pipe (this->handles_) == -1)
00126 ACE_ERROR_RETURN ((LM_ERROR,
00127 ACE_TEXT ("%p\n"),
00128 ACE_TEXT ("pipe")),
00129 -1);
00130
00131 #if !defined(__QNX__)
00132 int arg = RMSGN;
00133
00134
00135
00136 if (ACE_OS::ioctl (this->handles_[0],
00137 I_SRDOPT,
00138 (void *) arg) == -1
00139 || ACE_OS::ioctl (this->handles_[1],
00140 I_SRDOPT,
00141 (void *) arg) == -1)
00142 {
00143 this->close ();
00144 ACE_ERROR_RETURN ((LM_ERROR,
00145 ACE_TEXT ("%p\n"),
00146 ACE_TEXT ("ioctl")), -1);
00147 }
00148 #endif
00149
00150 #else
00151 if (ACE_OS::socketpair (AF_UNIX,
00152 SOCK_STREAM,
00153 0,
00154 this->handles_) == -1)
00155 ACE_ERROR_RETURN ((LM_ERROR,
00156 ACE_TEXT ("%p\n"),
00157 ACE_TEXT ("socketpair")),
00158 -1);
00159 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00160 ACE_UNUSED_ARG (buffer_size);
00161 # else
00162 if (ACE_OS::setsockopt (this->handles_[0],
00163 SOL_SOCKET,
00164 SO_RCVBUF,
00165 reinterpret_cast <const char *> (&buffer_size),
00166 sizeof (buffer_size)) == -1
00167 && errno != ENOTSUP)
00168 {
00169 this->close ();
00170 return -1;
00171 }
00172 if (ACE_OS::setsockopt (this->handles_[1],
00173 SOL_SOCKET,
00174 SO_SNDBUF,
00175 reinterpret_cast <const char *> (&buffer_size),
00176 sizeof (buffer_size)) == -1
00177 && errno != ENOTSUP)
00178 {
00179 this->close ();
00180 return -1;
00181 }
00182 # endif
00183 #endif
00184
00185
00186
00187 return 0;
00188 }
00189
00190 int
00191 ACE_Pipe::open (ACE_HANDLE handles[2])
00192 {
00193 ACE_TRACE ("ACE_Pipe::open");
00194
00195 if (this->open () == -1)
00196 return -1;
00197 else
00198 {
00199 handles[0] = this->handles_[0];
00200 handles[1] = this->handles_[1];
00201 return 0;
00202 }
00203 }
00204
00205
00206
00207 ACE_Pipe::ACE_Pipe (void)
00208 {
00209 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00210
00211 this->handles_[0] = ACE_INVALID_HANDLE;
00212 this->handles_[1] = ACE_INVALID_HANDLE;
00213 }
00214
00215 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00216 {
00217 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00218
00219 if (this->open (handles) == -1)
00220 ACE_ERROR ((LM_ERROR,
00221 ACE_TEXT ("ACE_Pipe::ACE_Pipe")));
00222 }
00223
00224 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00225 ACE_HANDLE write)
00226 {
00227 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00228 this->handles_[0] = read;
00229 this->handles_[1] = write;
00230 }
00231
00232 int
00233 ACE_Pipe::close (void)
00234 {
00235 ACE_TRACE ("ACE_Pipe::close");
00236
00237 int result = 0;
00238
00239
00240
00241
00242
00243 if (this->handles_[0] != ACE_INVALID_HANDLE)
00244 result = ACE_OS::closesocket (this->handles_[0]);
00245 this->handles_[0] = ACE_INVALID_HANDLE;
00246
00247 if (this->handles_[1] != ACE_INVALID_HANDLE)
00248 result |= ACE_OS::closesocket (this->handles_[1]);
00249 this->handles_[1] = ACE_INVALID_HANDLE;
00250
00251 return result;
00252 }
00253
00254
00255
00256
00257
00258
00259 ssize_t
00260 ACE_Pipe::send (size_t n, ...) const
00261 {
00262 ACE_TRACE ("ACE_Pipe::send");
00263 va_list argp;
00264 int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00265 iovec *iovp;
00266 #if defined (ACE_HAS_ALLOCA)
00267 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00268 #else
00269 ACE_NEW_RETURN (iovp,
00270 iovec[total_tuples],
00271 -1);
00272 #endif
00273
00274 va_start (argp, n);
00275
00276 for (int i = 0; i < total_tuples; ++i)
00277 {
00278 iovp[i].iov_base = va_arg (argp, char *);
00279 iovp[i].iov_len = va_arg (argp, int);
00280 }
00281
00282 #if defined (ACE_WIN32)
00283 ssize_t result = ACE::sendv (this->write_handle (),
00284 iovp,
00285 total_tuples);
00286 #else
00287 ssize_t result = ACE_OS::writev (this->write_handle (),
00288 iovp,
00289 total_tuples);
00290 #endif
00291
00292 #if !defined (ACE_HAS_ALLOCA)
00293 delete [] iovp;
00294 #endif
00295 va_end (argp);
00296 return result;
00297 }
00298
00299
00300
00301
00302
00303
00304
00305 ssize_t
00306 ACE_Pipe::recv (size_t n, ...) const
00307 {
00308 ACE_TRACE ("ACE_Pipe::recv");
00309 va_list argp;
00310 int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00311 iovec *iovp;
00312 #if defined (ACE_HAS_ALLOCA)
00313 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00314 #else
00315 ACE_NEW_RETURN (iovp,
00316 iovec[total_tuples],
00317 -1);
00318 #endif
00319
00320 va_start (argp, n);
00321
00322 for (int i = 0; i < total_tuples; ++i)
00323 {
00324 iovp[i].iov_base = va_arg (argp, char *);
00325 iovp[i].iov_len = va_arg (argp, int);
00326 }
00327
00328 #if defined (ACE_WIN32)
00329 ssize_t result = ACE::recvv (this->read_handle (),
00330 iovp,
00331 total_tuples);
00332 #else
00333 ssize_t result = ACE_OS::readv (this->read_handle (),
00334 iovp,
00335 total_tuples);
00336 #endif
00337
00338 #if !defined (ACE_HAS_ALLOCA)
00339 delete [] iovp;
00340 #endif
00341 va_end (argp);
00342 return result;
00343 }
00344
00345 ACE_END_VERSIONED_NAMESPACE_DECL