00001
00002
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009 #include "ace/Truncate.h"
00010
00011 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00012 # include "ace/OS_NS_unistd.h"
00013 #endif // ACE_HAS_STREAM_PIPES || __QNX__
00014
00015 #include "ace/os_include/netinet/os_tcp.h"
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Pipe.inl"
00019 #endif
00020
00021 ACE_RCSID(ace, Pipe, "$Id: Pipe.cpp 87115 2009-10-15 11:27:23Z olli $")
00022
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 void
00026 ACE_Pipe::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029 ACE_TRACE ("ACE_Pipe::dump");
00030 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00031 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handles_[0] = %d"), this->handles_[0]));
00032 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nhandles_[1] = %d\n"), this->handles_[1]));
00033 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00034 #endif
00035 }
00036
00037 int
00038 ACE_Pipe::open (int buffer_size)
00039 {
00040 ACE_TRACE ("ACE_Pipe::open");
00041
00042 #if defined (ACE_LACKS_SOCKETPAIR)
00043 ACE_INET_Addr my_addr;
00044 ACE_SOCK_Acceptor acceptor;
00045 ACE_SOCK_Connector connector;
00046 ACE_SOCK_Stream reader;
00047 ACE_SOCK_Stream writer;
00048 int result = 0;
00049 # if defined (ACE_WIN32)
00050 ACE_INET_Addr local_any (static_cast<u_short> (0), ACE_LOCALHOST);
00051 # else
00052 ACE_Addr local_any = ACE_Addr::sap_any;
00053 # endif
00054
00055
00056 if (acceptor.open (local_any) == -1
00057 || acceptor.get_local_addr (my_addr) == -1)
00058 result = -1;
00059 else
00060 {
00061 ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00062 ACE_LOCALHOST);
00063
00064
00065 if (connector.connect (writer, sv_addr) == -1)
00066 result = -1;
00067 else if (acceptor.accept (reader) == -1)
00068 {
00069 writer.close ();
00070 result = -1;
00071 }
00072 }
00073
00074
00075 acceptor.close ();
00076 if (result == -1)
00077 return -1;
00078
00079 this->handles_[0] = reader.get_handle ();
00080 this->handles_[1] = writer.get_handle ();
00081
00082 # if !defined (ACE_LACKS_TCP_NODELAY)
00083 int one = 1;
00084
00085
00086
00087
00088
00089 if (writer.set_option (ACE_IPPROTO_TCP,
00090 TCP_NODELAY,
00091 &one,
00092 sizeof one) == -1)
00093 {
00094 this->close ();
00095 return -1;
00096 }
00097 # endif
00098
00099 # if defined (ACE_LACKS_SO_RCVBUF) && defined (ACE_LACKS_SO_SNDBUF)
00100 ACE_UNUSED_ARG (buffer_size);
00101 # endif
00102 # if !defined (ACE_LACKS_SO_RCVBUF)
00103 if (reader.set_option (SOL_SOCKET,
00104 SO_RCVBUF,
00105 reinterpret_cast <void *> (&buffer_size),
00106 sizeof (buffer_size)) == -1
00107 && errno != ENOTSUP)
00108 {
00109 this->close ();
00110 return -1;
00111 }
00112 # endif
00113 # if !defined (ACE_LACKS_SO_SNDBUF)
00114 if (writer.set_option (SOL_SOCKET,
00115 SO_SNDBUF,
00116 reinterpret_cast <void *> (&buffer_size),
00117 sizeof (buffer_size)) == -1
00118 && errno != ENOTSUP)
00119 {
00120 this->close ();
00121 return -1;
00122 }
00123 # endif
00124
00125 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00126 ACE_UNUSED_ARG (buffer_size);
00127 if (ACE_OS::pipe (this->handles_) == -1)
00128 ACE_ERROR_RETURN ((LM_ERROR,
00129 ACE_TEXT ("%p\n"),
00130 ACE_TEXT ("pipe")),
00131 -1);
00132
00133 #if !defined(__QNX__)
00134 int arg = RMSGN;
00135
00136
00137
00138 if (ACE_OS::ioctl (this->handles_[0],
00139 I_SRDOPT,
00140 (void *) arg) == -1
00141 || ACE_OS::ioctl (this->handles_[1],
00142 I_SRDOPT,
00143 (void *) arg) == -1)
00144 {
00145 this->close ();
00146 ACE_ERROR_RETURN ((LM_ERROR,
00147 ACE_TEXT ("%p\n"),
00148 ACE_TEXT ("ioctl")), -1);
00149 }
00150 #endif
00151
00152 #else
00153 if (ACE_OS::socketpair (AF_UNIX,
00154 SOCK_STREAM,
00155 0,
00156 this->handles_) == -1)
00157 ACE_ERROR_RETURN ((LM_ERROR,
00158 ACE_TEXT ("%p\n"),
00159 ACE_TEXT ("socketpair")),
00160 -1);
00161 # if defined (ACE_LACKS_SO_SNDBUF) && defined (ACE_LACKS_SO_RCVBUF)
00162 ACE_UNUSED_ARG (buffer_size);
00163 # endif
00164 # if !defined (ACE_LACKS_SO_RCVBUF)
00165 if (ACE_OS::setsockopt (this->handles_[0],
00166 SOL_SOCKET,
00167 SO_RCVBUF,
00168 reinterpret_cast <const char *> (&buffer_size),
00169 sizeof (buffer_size)) == -1
00170 && errno != ENOTSUP)
00171 {
00172 this->close ();
00173 return -1;
00174 }
00175 # endif
00176 # if !defined (ACE_LACKS_SO_SNDBUF)
00177 if (ACE_OS::setsockopt (this->handles_[1],
00178 SOL_SOCKET,
00179 SO_SNDBUF,
00180 reinterpret_cast <const char *> (&buffer_size),
00181 sizeof (buffer_size)) == -1
00182 && errno != ENOTSUP)
00183 {
00184 this->close ();
00185 return -1;
00186 }
00187 # endif
00188 # if defined (ACE_OPENVMS) && !defined (ACE_LACKS_TCP_NODELAY)
00189 int one = 1;
00190
00191
00192
00193 if (ACE_OS::setsockopt (this->handles_[1],
00194 ACE_IPPROTO_TCP,
00195 TCP_NODELAY,
00196 reinterpret_cast <const char *> (&one),
00197 sizeof (one)) == -1)
00198 {
00199 this->close ();
00200 return -1;
00201 }
00202 # endif
00203 #endif
00204
00205
00206
00207 return 0;
00208 }
00209
00210 int
00211 ACE_Pipe::open (ACE_HANDLE handles[2])
00212 {
00213 ACE_TRACE ("ACE_Pipe::open");
00214
00215 if (this->open () == -1)
00216 return -1;
00217 else
00218 {
00219 handles[0] = this->handles_[0];
00220 handles[1] = this->handles_[1];
00221 return 0;
00222 }
00223 }
00224
00225
00226
00227 ACE_Pipe::ACE_Pipe (void)
00228 {
00229 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00230
00231 this->handles_[0] = ACE_INVALID_HANDLE;
00232 this->handles_[1] = ACE_INVALID_HANDLE;
00233 }
00234
00235 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00236 {
00237 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00238
00239 if (this->open (handles) == -1)
00240 ACE_ERROR ((LM_ERROR,
00241 ACE_TEXT ("ACE_Pipe::ACE_Pipe")));
00242 }
00243
00244 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00245 ACE_HANDLE write)
00246 {
00247 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00248 this->handles_[0] = read;
00249 this->handles_[1] = write;
00250 }
00251
00252 int
00253 ACE_Pipe::close (void)
00254 {
00255 ACE_TRACE ("ACE_Pipe::close");
00256
00257 int result = 0;
00258
00259
00260
00261
00262
00263 if (this->handles_[0] != ACE_INVALID_HANDLE)
00264 result = ACE_OS::closesocket (this->handles_[0]);
00265 this->handles_[0] = ACE_INVALID_HANDLE;
00266
00267 if (this->handles_[1] != ACE_INVALID_HANDLE)
00268 result |= ACE_OS::closesocket (this->handles_[1]);
00269 this->handles_[1] = ACE_INVALID_HANDLE;
00270
00271 return result;
00272 }
00273
00274
00275
00276
00277
00278
00279 ssize_t
00280 ACE_Pipe::send (size_t n, ...) const
00281 {
00282 ACE_TRACE ("ACE_Pipe::send");
00283 va_list argp;
00284 int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00285 iovec *iovp;
00286 #if defined (ACE_HAS_ALLOCA)
00287 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00288 #else
00289 ACE_NEW_RETURN (iovp,
00290 iovec[total_tuples],
00291 -1);
00292 #endif
00293
00294 va_start (argp, n);
00295
00296 for (int i = 0; i < total_tuples; ++i)
00297 {
00298 iovp[i].iov_base = va_arg (argp, char *);
00299 iovp[i].iov_len = va_arg (argp, int);
00300 }
00301
00302 #if defined (ACE_WIN32)
00303 ssize_t result = ACE::sendv (this->write_handle (),
00304 iovp,
00305 total_tuples);
00306 #else
00307 ssize_t result = ACE_OS::writev (this->write_handle (),
00308 iovp,
00309 total_tuples);
00310 #endif
00311
00312 #if !defined (ACE_HAS_ALLOCA)
00313 delete [] iovp;
00314 #endif
00315 va_end (argp);
00316 return result;
00317 }
00318
00319
00320
00321
00322
00323
00324
00325 ssize_t
00326 ACE_Pipe::recv (size_t n, ...) const
00327 {
00328 ACE_TRACE ("ACE_Pipe::recv");
00329 va_list argp;
00330 int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00331 iovec *iovp;
00332 #if defined (ACE_HAS_ALLOCA)
00333 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00334 #else
00335 ACE_NEW_RETURN (iovp,
00336 iovec[total_tuples],
00337 -1);
00338 #endif
00339
00340 va_start (argp, n);
00341
00342 for (int i = 0; i < total_tuples; ++i)
00343 {
00344 iovp[i].iov_base = va_arg (argp, char *);
00345 iovp[i].iov_len = va_arg (argp, int);
00346 }
00347
00348 #if defined (ACE_WIN32)
00349 ssize_t const result = ACE::recvv (this->read_handle (),
00350 iovp,
00351 total_tuples);
00352 #else
00353 ssize_t const result = ACE_OS::readv (this->read_handle (),
00354 iovp,
00355 total_tuples);
00356 #endif
00357
00358 #if !defined (ACE_HAS_ALLOCA)
00359 delete [] iovp;
00360 #endif
00361 va_end (argp);
00362 return result;
00363 }
00364
00365 ACE_END_VERSIONED_NAMESPACE_DECL