00001
00002
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009
00010 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00011 # include "ace/OS_NS_unistd.h"
00012 #endif // ACE_HAS_STREAM_PIPES || __QNX__
00013
00014 #include "ace/os_include/netinet/os_tcp.h"
00015
00016 #if !defined (__ACE_INLINE__)
00017 #include "ace/Pipe.inl"
00018 #endif
00019
00020 ACE_RCSID(ace, Pipe, "Pipe.cpp,v 4.50 2006/01/23 14:02:02 schmidt Exp")
00021
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 void
00025 ACE_Pipe::dump (void) const
00026 {
00027 #if defined (ACE_HAS_DUMP)
00028 ACE_TRACE ("ACE_Pipe::dump");
00029 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00030 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("handles_[0] = %d"), this->handles_[0]));
00031 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nhandles_[1] = %d"), this->handles_[1]));
00032 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\n")));
00033 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00034 #endif
00035 }
00036
00037 int
00038 ACE_Pipe::open (int buffer_size)
00039 {
00040 ACE_TRACE ("ACE_Pipe::open");
00041
00042 #if defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
00043 ACE_INET_Addr my_addr;
00044 ACE_SOCK_Acceptor acceptor;
00045 ACE_SOCK_Connector connector;
00046 ACE_SOCK_Stream reader;
00047 ACE_SOCK_Stream writer;
00048 int result = 0;
00049 # if defined (ACE_WIN32)
00050 ACE_INET_Addr local_any (static_cast<u_short> (0), ACE_LOCALHOST);
00051 # else
00052 ACE_Addr local_any = ACE_Addr::sap_any;
00053 # endif
00054
00055
00056 if (acceptor.open (local_any) == -1
00057 || acceptor.get_local_addr (my_addr) == -1)
00058 result = -1;
00059 else
00060 {
00061 ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00062 ACE_LOCALHOST);
00063
00064
00065 if (connector.connect (writer, sv_addr) == -1)
00066 result = -1;
00067 else if (acceptor.accept (reader) == -1)
00068 {
00069 writer.close ();
00070 result = -1;
00071 }
00072 }
00073
00074
00075 acceptor.close ();
00076 if (result == -1)
00077 return -1;
00078
00079 this->handles_[0] = reader.get_handle ();
00080 this->handles_[1] = writer.get_handle ();
00081
00082 # if !defined (ACE_LACKS_TCP_NODELAY)
00083 int one = 1;
00084
00085
00086
00087
00088
00089 if (writer.set_option (ACE_IPPROTO_TCP,
00090 TCP_NODELAY,
00091 &one,
00092 sizeof one) == -1)
00093 {
00094 this->close ();
00095 return -1;
00096 }
00097 # endif
00098
00099 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00100 ACE_UNUSED_ARG (buffer_size);
00101 # else
00102 if (reader.set_option (SOL_SOCKET,
00103 SO_RCVBUF,
00104 reinterpret_cast <void *> (&buffer_size),
00105 sizeof (buffer_size)) == -1
00106 && errno != ENOTSUP)
00107 {
00108 this->close ();
00109 return -1;
00110 }
00111 else if (writer.set_option (SOL_SOCKET,
00112 SO_SNDBUF,
00113 reinterpret_cast <void *> (&buffer_size),
00114 sizeof (buffer_size)) == -1
00115 && errno != ENOTSUP)
00116 {
00117 this->close ();
00118 return -1;
00119 }
00120 # endif
00121
00122 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00123 ACE_UNUSED_ARG (buffer_size);
00124 if (ACE_OS::pipe (this->handles_) == -1)
00125 ACE_ERROR_RETURN ((LM_ERROR,
00126 ACE_LIB_TEXT ("%p\n"),
00127 ACE_LIB_TEXT ("pipe")),
00128 -1);
00129
00130 #if !defined(__QNX__)
00131 int arg = RMSGN;
00132
00133
00134
00135 if (ACE_OS::ioctl (this->handles_[0],
00136 I_SRDOPT,
00137 (void *) arg) == -1
00138 || ACE_OS::ioctl (this->handles_[1],
00139 I_SRDOPT,
00140 (void *) arg) == -1)
00141 {
00142 this->close ();
00143 ACE_ERROR_RETURN ((LM_ERROR,
00144 ACE_LIB_TEXT ("%p\n"),
00145 ACE_LIB_TEXT ("ioctl")), -1);
00146 }
00147 #endif
00148
00149 #else
00150 if (ACE_OS::socketpair (AF_UNIX,
00151 SOCK_STREAM,
00152 0,
00153 this->handles_) == -1)
00154 ACE_ERROR_RETURN ((LM_ERROR,
00155 ACE_LIB_TEXT ("%p\n"),
00156 ACE_LIB_TEXT ("socketpair")),
00157 -1);
00158 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00159 ACE_UNUSED_ARG (buffer_size);
00160 # else
00161 if (ACE_OS::setsockopt (this->handles_[0],
00162 SOL_SOCKET,
00163 SO_RCVBUF,
00164 reinterpret_cast <const char *> (&buffer_size),
00165 sizeof (buffer_size)) == -1
00166 && errno != ENOTSUP)
00167 {
00168 this->close ();
00169 return -1;
00170 }
00171 if (ACE_OS::setsockopt (this->handles_[1],
00172 SOL_SOCKET,
00173 SO_SNDBUF,
00174 reinterpret_cast <const char *> (&buffer_size),
00175 sizeof (buffer_size)) == -1
00176 && errno != ENOTSUP)
00177 {
00178 this->close ();
00179 return -1;
00180 }
00181 # endif
00182 #endif
00183
00184
00185
00186 return 0;
00187 }
00188
00189 int
00190 ACE_Pipe::open (ACE_HANDLE handles[2])
00191 {
00192 ACE_TRACE ("ACE_Pipe::open");
00193
00194 if (this->open () == -1)
00195 return -1;
00196 else
00197 {
00198 handles[0] = this->handles_[0];
00199 handles[1] = this->handles_[1];
00200 return 0;
00201 }
00202 }
00203
00204
00205
00206 ACE_Pipe::ACE_Pipe (void)
00207 {
00208 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00209
00210 this->handles_[0] = ACE_INVALID_HANDLE;
00211 this->handles_[1] = ACE_INVALID_HANDLE;
00212 }
00213
00214 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00215 {
00216 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00217
00218 if (this->open (handles) == -1)
00219 ACE_ERROR ((LM_ERROR,
00220 ACE_LIB_TEXT ("ACE_Pipe::ACE_Pipe")));
00221 }
00222
00223 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00224 ACE_HANDLE write)
00225 {
00226 ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00227 this->handles_[0] = read;
00228 this->handles_[1] = write;
00229 }
00230
00231 int
00232 ACE_Pipe::close (void)
00233 {
00234 ACE_TRACE ("ACE_Pipe::close");
00235
00236 int result = 0;
00237
00238
00239
00240
00241
00242 if (this->handles_[0] != ACE_INVALID_HANDLE)
00243 result = ACE_OS::closesocket (this->handles_[0]);
00244 this->handles_[0] = ACE_INVALID_HANDLE;
00245
00246 if (this->handles_[1] != ACE_INVALID_HANDLE)
00247 result |= ACE_OS::closesocket (this->handles_[1]);
00248 this->handles_[1] = ACE_INVALID_HANDLE;
00249
00250 return result;
00251 }
00252
00253
00254
00255
00256
00257
00258 ssize_t
00259 ACE_Pipe::send (size_t n, ...) const
00260 {
00261 ACE_TRACE ("ACE_Pipe::send");
00262 va_list argp;
00263 size_t total_tuples = n / 2;
00264 iovec *iovp;
00265 #if defined (ACE_HAS_ALLOCA)
00266 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00267 #else
00268 ACE_NEW_RETURN (iovp,
00269 iovec[total_tuples],
00270 -1);
00271 #endif
00272
00273 va_start (argp, n);
00274
00275 for (size_t i = 0; i < total_tuples; i++)
00276 {
00277 iovp[i].iov_base = va_arg (argp, char *);
00278 iovp[i].iov_len = va_arg (argp, int);
00279 }
00280
00281 #if defined (ACE_WIN32)
00282 ssize_t result = ACE::sendv (this->write_handle (),
00283 iovp,
00284 total_tuples);
00285 #else
00286 ssize_t result = ACE_OS::writev (this->write_handle (),
00287 iovp,
00288 total_tuples);
00289 #endif
00290
00291 #if !defined (ACE_HAS_ALLOCA)
00292 delete [] iovp;
00293 #endif
00294 va_end (argp);
00295 return result;
00296 }
00297
00298
00299
00300
00301
00302
00303
00304 ssize_t
00305 ACE_Pipe::recv (size_t n, ...) const
00306 {
00307 ACE_TRACE ("ACE_Pipe::recv");
00308 va_list argp;
00309 size_t total_tuples = n / 2;
00310 iovec *iovp;
00311 #if defined (ACE_HAS_ALLOCA)
00312 iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00313 #else
00314 ACE_NEW_RETURN (iovp,
00315 iovec[total_tuples],
00316 -1);
00317 #endif
00318
00319 va_start (argp, n);
00320
00321 for (size_t i = 0; i < total_tuples; i++)
00322 {
00323 iovp[i].iov_base = va_arg (argp, char *);
00324 iovp[i].iov_len = va_arg (argp, int);
00325 }
00326
00327 #if defined (ACE_WIN32)
00328 ssize_t result = ACE::recvv (this->read_handle (),
00329 iovp,
00330 total_tuples);
00331 #else
00332 ssize_t result = ACE_OS::readv (this->read_handle (),
00333 iovp,
00334 total_tuples);
00335 #endif
00336
00337 #if !defined (ACE_HAS_ALLOCA)
00338 delete [] iovp;
00339 #endif
00340 va_end (argp);
00341 return result;
00342 }
00343
00344 ACE_END_VERSIONED_NAMESPACE_DECL