Pipe.cpp

Go to the documentation of this file.
00001 // $Id: Pipe.cpp 80826 2008-03-04 14:51:23Z wotte $
00002 
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009 #include "ace/Truncate.h"
00010 
00011 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00012 #  include "ace/OS_NS_unistd.h"
00013 #endif  // ACE_HAS_STREAM_PIPES || __QNX__
00014 
00015 #include "ace/os_include/netinet/os_tcp.h"
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Pipe.inl"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 ACE_RCSID(ace, Pipe, "$Id: Pipe.cpp 80826 2008-03-04 14:51:23Z wotte $")
00022 
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 void
00026 ACE_Pipe::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029   ACE_TRACE ("ACE_Pipe::dump");
00030   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00031   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handles_[0] = %d"), this->handles_[0]));
00032   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nhandles_[1] = %d"), this->handles_[1]));
00033   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\n")));
00034   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00035 #endif /* ACE_HAS_DUMP */
00036 }
00037 
00038 int
00039 ACE_Pipe::open (int buffer_size)
00040 {
00041   ACE_TRACE ("ACE_Pipe::open");
00042 
00043 #if defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
00044   ACE_INET_Addr my_addr;
00045   ACE_SOCK_Acceptor acceptor;
00046   ACE_SOCK_Connector connector;
00047   ACE_SOCK_Stream reader;
00048   ACE_SOCK_Stream writer;
00049   int result = 0;
00050 # if defined (ACE_WIN32)
00051   ACE_INET_Addr local_any  (static_cast<u_short> (0), ACE_LOCALHOST);
00052 # else
00053   ACE_Addr local_any = ACE_Addr::sap_any;
00054 # endif /* ACE_WIN32 */
00055 
00056   // Bind listener to any port and then find out what the port was.
00057   if (acceptor.open (local_any) == -1
00058       || acceptor.get_local_addr (my_addr) == -1)
00059     result = -1;
00060   else
00061     {
00062       ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00063                              ACE_LOCALHOST);
00064 
00065       // Establish a connection within the same process.
00066       if (connector.connect (writer, sv_addr) == -1)
00067         result = -1;
00068       else if (acceptor.accept (reader) == -1)
00069         {
00070           writer.close ();
00071           result = -1;
00072         }
00073     }
00074 
00075   // Close down the acceptor endpoint since we don't need it anymore.
00076   acceptor.close ();
00077   if (result == -1)
00078     return -1;
00079 
00080   this->handles_[0] = reader.get_handle ();
00081   this->handles_[1] = writer.get_handle ();
00082 
00083 # if !defined (ACE_LACKS_TCP_NODELAY)
00084   int one = 1;
00085 
00086   // Make sure that the TCP stack doesn't try to buffer small writes.
00087   // Since this communication is purely local to the host it doesn't
00088   // affect network performance.
00089 
00090   if (writer.set_option (ACE_IPPROTO_TCP,
00091                          TCP_NODELAY,
00092                          &one,
00093                          sizeof one) == -1)
00094     {
00095       this->close ();
00096       return -1;
00097     }
00098 # endif /* ! ACE_LACKS_TCP_NODELAY */
00099 
00100 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00101     ACE_UNUSED_ARG (buffer_size);
00102 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00103   if (reader.set_option (SOL_SOCKET,
00104                          SO_RCVBUF,
00105                          reinterpret_cast <void *> (&buffer_size),
00106                          sizeof (buffer_size)) == -1
00107       && errno != ENOTSUP)
00108     {
00109       this->close ();
00110       return -1;
00111     }
00112   else if (writer.set_option (SOL_SOCKET,
00113                               SO_SNDBUF,
00114                               reinterpret_cast <void *> (&buffer_size),
00115                               sizeof (buffer_size)) == -1
00116            && errno != ENOTSUP)
00117     {
00118       this->close ();
00119       return -1;
00120     }
00121 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00122 
00123 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00124   ACE_UNUSED_ARG (buffer_size);
00125   if (ACE_OS::pipe (this->handles_) == -1)
00126     ACE_ERROR_RETURN ((LM_ERROR,
00127                        ACE_TEXT ("%p\n"),
00128                        ACE_TEXT ("pipe")),
00129                       -1);
00130 
00131 #if !defined(__QNX__)
00132   int arg = RMSGN;
00133 
00134   // Enable "msg no discard" mode, which ensures that record
00135   // boundaries are maintained when messages are sent and received.
00136   if (ACE_OS::ioctl (this->handles_[0],
00137                      I_SRDOPT,
00138                      (void *) arg) == -1
00139       || ACE_OS::ioctl (this->handles_[1],
00140                         I_SRDOPT,
00141                         (void *) arg) == -1)
00142     {
00143       this->close ();
00144       ACE_ERROR_RETURN ((LM_ERROR,
00145                          ACE_TEXT ("%p\n"),
00146                          ACE_TEXT ("ioctl")), -1);
00147     }
00148 #endif /* __QNX__ */
00149 
00150 #else  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00151   if (ACE_OS::socketpair (AF_UNIX,
00152                           SOCK_STREAM,
00153                           0,
00154                           this->handles_) == -1)
00155     ACE_ERROR_RETURN ((LM_ERROR,
00156                        ACE_TEXT ("%p\n"),
00157                        ACE_TEXT ("socketpair")),
00158                       -1);
00159 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00160   ACE_UNUSED_ARG (buffer_size);
00161 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00162   if (ACE_OS::setsockopt (this->handles_[0],
00163                           SOL_SOCKET,
00164                           SO_RCVBUF,
00165                           reinterpret_cast <const char *> (&buffer_size),
00166                           sizeof (buffer_size)) == -1
00167       && errno != ENOTSUP)
00168     {
00169       this->close ();
00170       return -1;
00171     }
00172   if (ACE_OS::setsockopt (this->handles_[1],
00173                           SOL_SOCKET,
00174                           SO_SNDBUF,
00175                           reinterpret_cast <const char *> (&buffer_size),
00176                           sizeof (buffer_size)) == -1
00177       && errno != ENOTSUP)
00178     {
00179       this->close ();
00180       return -1;
00181     }
00182 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00183 # if defined (ACE_OPENVMS) && !defined (ACE_LACKS_TCP_NODELAY)
00184   int one = 1;
00185   // OpenVMS implements socketpair(AF_UNIX...) by returning AF_INET sockets.
00186   // Since these are plagued by Nagle as any other INET socket we need to set
00187   // TCP_NODELAY on the write handle.
00188   if (ACE_OS::setsockopt (this->handles_[1],
00189                           ACE_IPPROTO_TCP,
00190                           TCP_NODELAY,
00191                           reinterpret_cast <const char *> (&one),
00192                           sizeof (one)) == -1)
00193     {
00194       this->close ();
00195       return -1;
00196     }
00197 # endif /* ACE_OPENVMS && !ACE_LACKS_TCP_NODELAY */
00198 #endif  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00199   // Point both the read and write HANDLES to the appropriate socket
00200   // HANDLEs.
00201 
00202   return 0;
00203 }
00204 
00205 int
00206 ACE_Pipe::open (ACE_HANDLE handles[2])
00207 {
00208   ACE_TRACE ("ACE_Pipe::open");
00209 
00210   if (this->open () == -1)
00211     return -1;
00212   else
00213     {
00214       handles[0] = this->handles_[0];
00215       handles[1] = this->handles_[1];
00216       return 0;
00217     }
00218 }
00219 
00220 // Do nothing...
00221 
00222 ACE_Pipe::ACE_Pipe (void)
00223 {
00224   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00225 
00226   this->handles_[0] = ACE_INVALID_HANDLE;
00227   this->handles_[1] = ACE_INVALID_HANDLE;
00228 }
00229 
00230 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00231 {
00232   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00233 
00234   if (this->open (handles) == -1)
00235     ACE_ERROR ((LM_ERROR,
00236                 ACE_TEXT ("ACE_Pipe::ACE_Pipe")));
00237 }
00238 
00239 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00240                     ACE_HANDLE write)
00241 {
00242   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00243   this->handles_[0] = read;
00244   this->handles_[1] = write;
00245 }
00246 
00247 int
00248 ACE_Pipe::close (void)
00249 {
00250   ACE_TRACE ("ACE_Pipe::close");
00251 
00252   int result = 0;
00253 
00254   // Note that the following will work even if we aren't closing down
00255   // sockets because <ACE_OS::closesocket> will just call <::close> in
00256   // that case!
00257 
00258   if (this->handles_[0] != ACE_INVALID_HANDLE)
00259     result = ACE_OS::closesocket (this->handles_[0]);
00260   this->handles_[0] = ACE_INVALID_HANDLE;
00261 
00262   if (this->handles_[1] != ACE_INVALID_HANDLE)
00263     result |= ACE_OS::closesocket (this->handles_[1]);
00264   this->handles_[1] = ACE_INVALID_HANDLE;
00265 
00266   return result;
00267 }
00268 
00269 // Send N char *ptrs and int lengths.  Note that the char *'s precede
00270 // the ints (basically, an varargs version of writev).  The count N is
00271 // the *total* number of trailing arguments, *not* a couple of the
00272 // number of tuple pairs!
00273 
00274 ssize_t
00275 ACE_Pipe::send (size_t n, ...) const
00276 {
00277   ACE_TRACE ("ACE_Pipe::send");
00278   va_list argp;
00279   int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00280   iovec *iovp;
00281 #if defined (ACE_HAS_ALLOCA)
00282   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00283 #else
00284   ACE_NEW_RETURN (iovp,
00285                   iovec[total_tuples],
00286                   -1);
00287 #endif /* !defined (ACE_HAS_ALLOCA) */
00288 
00289   va_start (argp, n);
00290 
00291   for (int i = 0; i < total_tuples; ++i)
00292     {
00293       iovp[i].iov_base = va_arg (argp, char *);
00294       iovp[i].iov_len  = va_arg (argp, int);
00295     }
00296 
00297 #if defined (ACE_WIN32)
00298   ssize_t result = ACE::sendv (this->write_handle (),
00299                                iovp,
00300                                total_tuples);
00301 #else
00302   ssize_t result = ACE_OS::writev (this->write_handle (),
00303                                    iovp,
00304                                    total_tuples);
00305 #endif /* ACE_WIN32 */
00306 
00307 #if !defined (ACE_HAS_ALLOCA)
00308   delete [] iovp;
00309 #endif /* !defined (ACE_HAS_ALLOCA) */
00310   va_end (argp);
00311   return result;
00312 }
00313 
00314 // This is basically an interface to ACE_OS::readv, that doesn't use
00315 // the struct iovec explicitly.  The ... can be passed as an arbitrary
00316 // number of (char *ptr, int len) tuples.  However, the count N is the
00317 // *total* number of trailing arguments, *not* a couple of the number
00318 // of tuple pairs!
00319 
00320 ssize_t
00321 ACE_Pipe::recv (size_t n, ...) const
00322 {
00323   ACE_TRACE ("ACE_Pipe::recv");
00324   va_list argp;
00325   int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00326   iovec *iovp;
00327 #if defined (ACE_HAS_ALLOCA)
00328   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00329 #else
00330   ACE_NEW_RETURN (iovp,
00331                   iovec[total_tuples],
00332                   -1);
00333 #endif /* !defined (ACE_HAS_ALLOCA) */
00334 
00335   va_start (argp, n);
00336 
00337   for (int i = 0; i < total_tuples; ++i)
00338     {
00339       iovp[i].iov_base = va_arg (argp, char *);
00340       iovp[i].iov_len  = va_arg (argp, int);
00341     }
00342 
00343 #if defined (ACE_WIN32)
00344   ssize_t const result = ACE::recvv (this->read_handle (),
00345                                      iovp,
00346                                      total_tuples);
00347 #else
00348   ssize_t const result = ACE_OS::readv (this->read_handle (),
00349                                         iovp,
00350                                         total_tuples);
00351 #endif /* ACE_WIN32 */
00352 
00353 #if !defined (ACE_HAS_ALLOCA)
00354   delete [] iovp;
00355 #endif /* !defined (ACE_HAS_ALLOCA) */
00356   va_end (argp);
00357   return result;
00358 }
00359 
00360 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Tue Feb 2 17:18:41 2010 for ACE by  doxygen 1.4.7