Public Member Functions | Private Attributes

ACE_Pipe Class Reference

Provides a portable bidirectional "pipe" abstraction. More...

#include <Pipe.h>

List of all members.

Public Member Functions

 ACE_Pipe (void)
 Default constructor (does nothing...).
 ACE_Pipe (ACE_HANDLE handles[2])
 Open the pipe and initialize the handles.
 ACE_Pipe (ACE_HANDLE read, ACE_HANDLE write)
 Initialize the ACE_Pipe from the read and write handles.
 ~ACE_Pipe (void)
 Default dtor. It doesn't close the handles for you.
int open (ACE_HANDLE handles[2])
 Open the pipe and initialize the handles.
int open (int buffer_size=ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
 Open the pipe, setting the buffer size to the maximum.
int close (void)
 Close down the pipe HANDLEs;.
ACE_HANDLE read_handle (void) const
ACE_HANDLE write_handle (void) const
void dump (void) const
 Dump the state of the object.
ssize_t send (const void *buf, size_t n) const
 send upto n bytes in buf.
ssize_t recv (void *buf, size_t n) const
 Recv upto n bytes in buf.
ssize_t send_n (const void *buf, size_t n) const
 Send n bytes, keep trying until n are sent.
ssize_t send_n (const ACE_Message_Block *message_block, const ACE_Time_Value *timeout=0, size_t *bytes_transferred=0)
ssize_t recv_n (void *buf, size_t n) const
 Recv n bytes, keep trying until n are received.
ssize_t send (const iovec iov[], int n) const
 Send iovecs via <writev>.
ssize_t recv (iovec iov[], int n) const
 Recv iovecs via <readv>.
ssize_t send (size_t n,...) const
ssize_t recv (size_t n,...) const
ssize_t send (const void *buf, size_t n, ACE_OVERLAPPED *overlapped) const
 Send n bytes via Win32 WriteFile using overlapped I/O.
ssize_t recv (void *buf, size_t n, ACE_OVERLAPPED *overlapped) const
 Recv n bytes via Win32 ReadFile using overlapped I/O.
ssize_t sendv (const iovec iov[], int n) const
 Send an <iovec> of size n to the file.
ssize_t sendv_n (const iovec iov[], int n) const
ssize_t recvv_n (iovec iov[], int n) const
 Receive an iovec of size n to the file.

Private Attributes

ACE_HANDLE handles_ [2]

Detailed Description

Provides a portable bidirectional "pipe" abstraction.

This class is designed to work with select()-based demuxers, such as the ACE_Select_Reactor, which is why it uses sockets on Windows rather than Win32 pipes (which aren't select()'able).

Definition at line 44 of file Pipe.h.


Constructor & Destructor Documentation

ACE_Pipe::ACE_Pipe ( void   ) 

Default constructor (does nothing...).

Definition at line 227 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::ACE_Pipe");

  this->handles_[0] = ACE_INVALID_HANDLE;
  this->handles_[1] = ACE_INVALID_HANDLE;
}

ACE_Pipe::ACE_Pipe ( ACE_HANDLE  handles[2]  ) 

Open the pipe and initialize the handles.

Definition at line 235 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::ACE_Pipe");

  if (this->open (handles) == -1)
    ACE_ERROR ((LM_ERROR,
                ACE_TEXT ("ACE_Pipe::ACE_Pipe")));
}

ACE_Pipe::ACE_Pipe ( ACE_HANDLE  read,
ACE_HANDLE  write 
)

Initialize the ACE_Pipe from the read and write handles.

Definition at line 244 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::ACE_Pipe");
  this->handles_[0] = read;
  this->handles_[1] = write;
}

ACE_Pipe::~ACE_Pipe ( void   ) 

Default dtor. It doesn't close the handles for you.

Definition at line 11 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::~ACE_Pipe");
  // Notice that the destructor doesn't close the handles for you.
}


Member Function Documentation

int ACE_Pipe::close ( void   ) 

Close down the pipe HANDLEs;.

Definition at line 253 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::close");

  int result = 0;

  // Note that the following will work even if we aren't closing down
  // sockets because <ACE_OS::closesocket> will just call <::close> in
  // that case!

  if (this->handles_[0] != ACE_INVALID_HANDLE)
    result = ACE_OS::closesocket (this->handles_[0]);
  this->handles_[0] = ACE_INVALID_HANDLE;

  if (this->handles_[1] != ACE_INVALID_HANDLE)
    result |= ACE_OS::closesocket (this->handles_[1]);
  this->handles_[1] = ACE_INVALID_HANDLE;

  return result;
}

void ACE_Pipe::dump ( void   )  const

Dump the state of the object.

Definition at line 26 of file Pipe.cpp.

{
#if defined (ACE_HAS_DUMP)
  ACE_TRACE ("ACE_Pipe::dump");
  ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handles_[0] = %d"), this->handles_[0]));
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nhandles_[1] = %d\n"), this->handles_[1]));
  ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
#endif /* ACE_HAS_DUMP */
}

int ACE_Pipe::open ( int  buffer_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ  ) 

Open the pipe, setting the buffer size to the maximum.

Definition at line 38 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::open");

#if defined (ACE_LACKS_SOCKETPAIR)
  ACE_INET_Addr my_addr;
  ACE_SOCK_Acceptor acceptor;
  ACE_SOCK_Connector connector;
  ACE_SOCK_Stream reader;
  ACE_SOCK_Stream writer;
  int result = 0;
# if defined (ACE_WIN32)
  ACE_INET_Addr local_any  (static_cast<u_short> (0), ACE_LOCALHOST);
# else
  ACE_Addr local_any = ACE_Addr::sap_any;
# endif /* ACE_WIN32 */

  // Bind listener to any port and then find out what the port was.
  if (acceptor.open (local_any) == -1
      || acceptor.get_local_addr (my_addr) == -1)
    result = -1;
  else
    {
      ACE_INET_Addr sv_addr (my_addr.get_port_number (),
                             ACE_LOCALHOST);

      // Establish a connection within the same process.
      if (connector.connect (writer, sv_addr) == -1)
        result = -1;
      else if (acceptor.accept (reader) == -1)
        {
          writer.close ();
          result = -1;
        }
    }

  // Close down the acceptor endpoint since we don't need it anymore.
  acceptor.close ();
  if (result == -1)
    return -1;

  this->handles_[0] = reader.get_handle ();
  this->handles_[1] = writer.get_handle ();

# if !defined (ACE_LACKS_TCP_NODELAY)
  int one = 1;

  // Make sure that the TCP stack doesn't try to buffer small writes.
  // Since this communication is purely local to the host it doesn't
  // affect network performance.

  if (writer.set_option (ACE_IPPROTO_TCP,
                         TCP_NODELAY,
                         &one,
                         sizeof one) == -1)
    {
      this->close ();
      return -1;
    }
# endif /* ! ACE_LACKS_TCP_NODELAY */

# if defined (ACE_LACKS_SO_RCVBUF) && defined (ACE_LACKS_SO_SNDBUF)
    ACE_UNUSED_ARG (buffer_size);
# endif
# if !defined (ACE_LACKS_SO_RCVBUF)
  if (reader.set_option (SOL_SOCKET,
                         SO_RCVBUF,
                         reinterpret_cast <void *> (&buffer_size),
                         sizeof (buffer_size)) == -1
      && errno != ENOTSUP)
    {
      this->close ();
      return -1;
    }
# endif /* !ACE_LACKS_SO_RCVBUF */
# if !defined (ACE_LACKS_SO_SNDBUF)
  if (writer.set_option (SOL_SOCKET,
                         SO_SNDBUF,
                         reinterpret_cast <void *> (&buffer_size),
                         sizeof (buffer_size)) == -1
           && errno != ENOTSUP)
    {
      this->close ();
      return -1;
    }
# endif /* !ACE_LACKS_SO_SNDBUF */

#elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
  ACE_UNUSED_ARG (buffer_size);
  if (ACE_OS::pipe (this->handles_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("pipe")),
                      -1);

#if !defined(__QNX__)
  int arg = RMSGN;

  // Enable "msg no discard" mode, which ensures that record
  // boundaries are maintained when messages are sent and received.
  if (ACE_OS::ioctl (this->handles_[0],
                     I_SRDOPT,
                     (void *) arg) == -1
      || ACE_OS::ioctl (this->handles_[1],
                        I_SRDOPT,
                        (void *) arg) == -1)
    {
      this->close ();
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("%p\n"),
                         ACE_TEXT ("ioctl")), -1);
    }
#endif /* __QNX__ */

#else  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
  if (ACE_OS::socketpair (AF_UNIX,
                          SOCK_STREAM,
                          0,
                          this->handles_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("socketpair")),
                      -1);
# if defined (ACE_LACKS_SO_SNDBUF) && defined (ACE_LACKS_SO_RCVBUF)
  ACE_UNUSED_ARG (buffer_size);
# endif
# if !defined (ACE_LACKS_SO_RCVBUF)
  if (ACE_OS::setsockopt (this->handles_[0],
                          SOL_SOCKET,
                          SO_RCVBUF,
                          reinterpret_cast <const char *> (&buffer_size),
                          sizeof (buffer_size)) == -1
      && errno != ENOTSUP)
    {
      this->close ();
      return -1;
    }
# endif
# if !defined (ACE_LACKS_SO_SNDBUF)
  if (ACE_OS::setsockopt (this->handles_[1],
                          SOL_SOCKET,
                          SO_SNDBUF,
                          reinterpret_cast <const char *> (&buffer_size),
                          sizeof (buffer_size)) == -1
      && errno != ENOTSUP)
    {
      this->close ();
      return -1;
    }
# endif /* ! ACE_LACKS_SO_SNDBUF */
# if defined (ACE_OPENVMS) && !defined (ACE_LACKS_TCP_NODELAY)
  int one = 1;
  // OpenVMS implements socketpair(AF_UNIX...) by returning AF_INET sockets.
  // Since these are plagued by Nagle as any other INET socket we need to set
  // TCP_NODELAY on the write handle.
  if (ACE_OS::setsockopt (this->handles_[1],
                          ACE_IPPROTO_TCP,
                          TCP_NODELAY,
                          reinterpret_cast <const char *> (&one),
                          sizeof (one)) == -1)
    {
      this->close ();
      return -1;
    }
# endif /* ACE_OPENVMS && !ACE_LACKS_TCP_NODELAY */
#endif  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
  // Point both the read and write HANDLES to the appropriate socket
  // HANDLEs.

  return 0;
}

int ACE_Pipe::open ( ACE_HANDLE  handles[2]  ) 

Open the pipe and initialize the handles.

Definition at line 211 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::open");

  if (this->open () == -1)
    return -1;
  else
    {
      handles[0] = this->handles_[0];
      handles[1] = this->handles_[1];
      return 0;
    }
}

ACE_HANDLE ACE_Pipe::read_handle ( void   )  const

This is the "read" side of the pipe. Note, however, that processes can also write to this handle as well since pipes are bi-directional.

Definition at line 18 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::read_handle");
  return this->handles_[0];
}

ssize_t ACE_Pipe::recv ( void *  buf,
size_t  n 
) const

Recv upto n bytes in buf.

Definition at line 137 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::recv");
#if defined (ACE_WIN32)
  return ACE_OS::recv (this->read_handle (), static_cast <char *> (buf), n);
#else
  return ACE_OS::read (this->read_handle (), static_cast <char *> (buf), n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::recv ( void *  buf,
size_t  n,
ACE_OVERLAPPED overlapped 
) const

Recv n bytes via Win32 ReadFile using overlapped I/O.

Definition at line 180 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::recv");
  return ACE_OS::read (this->read_handle (), static_cast <char *> (buf), n,
                       overlapped);
}

ssize_t ACE_Pipe::recv ( iovec  iov[],
int  n 
) const

Recv iovecs via <readv>.

Definition at line 159 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::recv");
#if defined (ACE_WIN32)
  return ACE_OS::recvv (this->read_handle (), iov, n);
#else
  return ACE_OS::readv (this->read_handle (), iov, n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::recv ( size_t  n,
  ... 
) const

This is an interface to readv, that doesn't use the struct iovec explicitly. The ... can be passed as an arbitrary number of (char *ptr, int len) tuples. However, the count N is the *total* number of trailing arguments, *not* a count of the number of tuple pairs!

Definition at line 326 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::recv");
  va_list argp;
  int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
  iovec *iovp;
#if defined (ACE_HAS_ALLOCA)
  iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
#else
  ACE_NEW_RETURN (iovp,
                  iovec[total_tuples],
                  -1);
#endif /* !defined (ACE_HAS_ALLOCA) */

  va_start (argp, n);

  for (int i = 0; i < total_tuples; ++i)
    {
      iovp[i].iov_base = va_arg (argp, char *);
      iovp[i].iov_len  = va_arg (argp, int);
    }

#if defined (ACE_WIN32)
  ssize_t const result = ACE::recvv (this->read_handle (),
                                     iovp,
                                     total_tuples);
#else
  ssize_t const result = ACE_OS::readv (this->read_handle (),
                                        iovp,
                                        total_tuples);
#endif /* ACE_WIN32 */

#if !defined (ACE_HAS_ALLOCA)
  delete [] iovp;
#endif /* !defined (ACE_HAS_ALLOCA) */
  va_end (argp);
  return result;
}

ssize_t ACE_Pipe::recv_n ( void *  buf,
size_t  n 
) const

Recv n bytes, keep trying until n are received.

Definition at line 115 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::recv_n");
#if defined (ACE_WIN32)
  return ACE::recv_n (this->read_handle (), buf, n);
#else
  return ACE::read_n (this->read_handle (), buf, n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::recvv_n ( iovec  iov[],
int  n 
) const

Receive an iovec of size n to the file.

Definition at line 68 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::recvv_n");
  // @@ Carlos, can you please update this to call the
  // new ACE::recvv_n() method that you write?
#if defined (ACE_WIN32)
  return ACE_OS::sendv (this->read_handle (),
                        iov,
                        n);
#else
  return ACE_OS::readv (this->read_handle (),
                        iov,
                        n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::send ( const void *  buf,
size_t  n,
ACE_OVERLAPPED overlapped 
) const

Send n bytes via Win32 WriteFile using overlapped I/O.

Definition at line 170 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::send");
  return ACE_OS::write (this->write_handle (),
                        static_cast <const char *> (buf), n,
                        overlapped);
}

ssize_t ACE_Pipe::send ( const void *  buf,
size_t  n 
) const

send upto n bytes in buf.

Definition at line 126 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::send");
#if defined (ACE_WIN32)
  return ACE_OS::send (this->write_handle (), static_cast <const char *> (buf), n);
#else
  return ACE_OS::write (this->write_handle (), static_cast <const char *> (buf), n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::send ( const iovec  iov[],
int  n 
) const

Send iovecs via <writev>.

Definition at line 148 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::send");
#if defined (ACE_WIN32)
  return ACE_OS::sendv (this->write_handle (), iov, n);
#else
  return ACE_OS::writev (this->write_handle (), iov, n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::send ( size_t  n,
  ... 
) const

Send N char *ptrs and int lengths. Note that the char *'s precede the ints (basically, an varargs version of writev). The count N is the *total* number of trailing arguments, *not* a couple of the number of tuple pairs!

Definition at line 280 of file Pipe.cpp.

{
  ACE_TRACE ("ACE_Pipe::send");
  va_list argp;
  int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
  iovec *iovp;
#if defined (ACE_HAS_ALLOCA)
  iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
#else
  ACE_NEW_RETURN (iovp,
                  iovec[total_tuples],
                  -1);
#endif /* !defined (ACE_HAS_ALLOCA) */

  va_start (argp, n);

  for (int i = 0; i < total_tuples; ++i)
    {
      iovp[i].iov_base = va_arg (argp, char *);
      iovp[i].iov_len  = va_arg (argp, int);
    }

#if defined (ACE_WIN32)
  ssize_t result = ACE::sendv (this->write_handle (),
                               iovp,
                               total_tuples);
#else
  ssize_t result = ACE_OS::writev (this->write_handle (),
                                   iovp,
                                   total_tuples);
#endif /* ACE_WIN32 */

#if !defined (ACE_HAS_ALLOCA)
  delete [] iovp;
#endif /* !defined (ACE_HAS_ALLOCA) */
  va_end (argp);
  return result;
}

ssize_t ACE_Pipe::send_n ( const void *  buf,
size_t  n 
) const

Send n bytes, keep trying until n are sent.

Definition at line 101 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::send_n");
#if defined (ACE_WIN32)
  return ACE::send_n (this->write_handle (), buf, n);
#else
  return ACE::write_n (this->write_handle (), buf, n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::send_n ( const ACE_Message_Block message_block,
const ACE_Time_Value timeout = 0,
size_t *  bytes_transferred = 0 
)

Send all the message_blocks chained through their <next> and <cont> pointers. This call uses the underlying OS gather-write operation to reduce the domain-crossing penalty.

Definition at line 47 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::send_n");
#if defined (ACE_WIN32)
  return ACE::send_n (this->write_handle (),
                      message_block,
                      timeout,
                      bytes_transferred);
#else
  ACE_UNUSED_ARG (timeout);
  return ACE::write_n (this->write_handle (),
                      message_block,
                      bytes_transferred);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::sendv ( const iovec  iov[],
int  n 
) const

Send an <iovec> of size n to the file.

Definition at line 87 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::sendv");
#if defined (ACE_WIN32)
  return ACE_OS::sendv (this->write_handle (), iov, n);
#else
  return ACE_OS::writev (this->write_handle (), iov, n);
#endif /* ACE_WIN32 */
}

ssize_t ACE_Pipe::sendv_n ( const iovec  iov[],
int  n 
) const

Send an iovec of size n to the file. Will block until all bytes are sent or an error occurs.

Definition at line 32 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::sendv_n");
#if defined (ACE_WIN32)
  return ACE::sendv_n (this->write_handle (),
                      iov,
                      n);
#else
  return ACE::writev_n (this->write_handle (),
                        iov,
                        n);
#endif /* ACE_WIN32 */
}

ACE_HANDLE ACE_Pipe::write_handle ( void   )  const

This is the "write" side of the pipe. Note, however, that processes can also read to this handle as well since pipes are bi-directional.

Definition at line 25 of file Pipe.inl.

{
  ACE_TRACE ("ACE_Pipe::write_handle");
  return this->handles_[1];
}


Member Data Documentation

ACE_HANDLE ACE_Pipe::handles_[2] [private]

Definition at line 154 of file Pipe.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines