Pipe.cpp

Go to the documentation of this file.
00001 // Pipe.cpp,v 4.50 2006/01/23 14:02:02 schmidt Exp
00002 
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009 
00010 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00011 #  include "ace/OS_NS_unistd.h"
00012 #endif  // ACE_HAS_STREAM_PIPES || __QNX__
00013 
00014 #include "ace/os_include/netinet/os_tcp.h"
00015 
00016 #if !defined (__ACE_INLINE__)
00017 #include "ace/Pipe.inl"
00018 #endif /* __ACE_INLINE__ */
00019 
00020 ACE_RCSID(ace, Pipe, "Pipe.cpp,v 4.50 2006/01/23 14:02:02 schmidt Exp")
00021 
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023 
00024 void
00025 ACE_Pipe::dump (void) const
00026 {
00027 #if defined (ACE_HAS_DUMP)
00028   ACE_TRACE ("ACE_Pipe::dump");
00029   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00030   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("handles_[0] = %d"), this->handles_[0]));
00031   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nhandles_[1] = %d"), this->handles_[1]));
00032   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\n")));
00033   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00034 #endif /* ACE_HAS_DUMP */
00035 }
00036 
00037 int
00038 ACE_Pipe::open (int buffer_size)
00039 {
00040   ACE_TRACE ("ACE_Pipe::open");
00041 
00042 #if defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
00043   ACE_INET_Addr my_addr;
00044   ACE_SOCK_Acceptor acceptor;
00045   ACE_SOCK_Connector connector;
00046   ACE_SOCK_Stream reader;
00047   ACE_SOCK_Stream writer;
00048   int result = 0;
00049 # if defined (ACE_WIN32)
00050   ACE_INET_Addr local_any  (static_cast<u_short> (0), ACE_LOCALHOST);
00051 # else
00052   ACE_Addr local_any = ACE_Addr::sap_any;
00053 # endif /* ACE_WIN32 */
00054 
00055   // Bind listener to any port and then find out what the port was.
00056   if (acceptor.open (local_any) == -1
00057       || acceptor.get_local_addr (my_addr) == -1)
00058     result = -1;
00059   else
00060     {
00061       ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00062                              ACE_LOCALHOST);
00063 
00064       // Establish a connection within the same process.
00065       if (connector.connect (writer, sv_addr) == -1)
00066         result = -1;
00067       else if (acceptor.accept (reader) == -1)
00068         {
00069           writer.close ();
00070           result = -1;
00071         }
00072     }
00073 
00074   // Close down the acceptor endpoint since we don't need it anymore.
00075   acceptor.close ();
00076   if (result == -1)
00077     return -1;
00078 
00079   this->handles_[0] = reader.get_handle ();
00080   this->handles_[1] = writer.get_handle ();
00081 
00082 # if !defined (ACE_LACKS_TCP_NODELAY)
00083   int one = 1;
00084 
00085   // Make sure that the TCP stack doesn't try to buffer small writes.
00086   // Since this communication is purely local to the host it doesn't
00087   // affect network performance.
00088 
00089   if (writer.set_option (ACE_IPPROTO_TCP,
00090                          TCP_NODELAY,
00091                          &one,
00092                          sizeof one) == -1)
00093     {
00094       this->close ();
00095       return -1;
00096     }
00097 # endif /* ! ACE_LACKS_TCP_NODELAY */
00098 
00099 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00100     ACE_UNUSED_ARG (buffer_size);
00101 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00102   if (reader.set_option (SOL_SOCKET,
00103                          SO_RCVBUF,
00104                          reinterpret_cast <void *> (&buffer_size),
00105                          sizeof (buffer_size)) == -1
00106       && errno != ENOTSUP)
00107     {
00108       this->close ();
00109       return -1;
00110     }
00111   else if (writer.set_option (SOL_SOCKET,
00112                               SO_SNDBUF,
00113                               reinterpret_cast <void *> (&buffer_size),
00114                               sizeof (buffer_size)) == -1
00115            && errno != ENOTSUP)
00116     {
00117       this->close ();
00118       return -1;
00119     }
00120 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00121 
00122 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00123   ACE_UNUSED_ARG (buffer_size);
00124   if (ACE_OS::pipe (this->handles_) == -1)
00125     ACE_ERROR_RETURN ((LM_ERROR,
00126                        ACE_LIB_TEXT ("%p\n"),
00127                        ACE_LIB_TEXT ("pipe")),
00128                       -1);
00129 
00130 #if !defined(__QNX__)
00131   int arg = RMSGN;
00132 
00133   // Enable "msg no discard" mode, which ensures that record
00134   // boundaries are maintained when messages are sent and received.
00135   if (ACE_OS::ioctl (this->handles_[0],
00136                      I_SRDOPT,
00137                      (void *) arg) == -1
00138       || ACE_OS::ioctl (this->handles_[1],
00139                         I_SRDOPT,
00140                         (void *) arg) == -1)
00141     {
00142       this->close ();
00143       ACE_ERROR_RETURN ((LM_ERROR,
00144                          ACE_LIB_TEXT ("%p\n"),
00145                          ACE_LIB_TEXT ("ioctl")), -1);
00146     }
00147 #endif /* __QNX__ */
00148 
00149 #else  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00150   if (ACE_OS::socketpair (AF_UNIX,
00151                           SOCK_STREAM,
00152                           0,
00153                           this->handles_) == -1)
00154     ACE_ERROR_RETURN ((LM_ERROR,
00155                        ACE_LIB_TEXT ("%p\n"),
00156                        ACE_LIB_TEXT ("socketpair")),
00157                       -1);
00158 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00159   ACE_UNUSED_ARG (buffer_size);
00160 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00161   if (ACE_OS::setsockopt (this->handles_[0],
00162                           SOL_SOCKET,
00163                           SO_RCVBUF,
00164                           reinterpret_cast <const char *> (&buffer_size),
00165                           sizeof (buffer_size)) == -1
00166       && errno != ENOTSUP)
00167     {
00168       this->close ();
00169       return -1;
00170     }
00171   if (ACE_OS::setsockopt (this->handles_[1],
00172                           SOL_SOCKET,
00173                           SO_SNDBUF,
00174                           reinterpret_cast <const char *> (&buffer_size),
00175                           sizeof (buffer_size)) == -1
00176       && errno != ENOTSUP)
00177     {
00178       this->close ();
00179       return -1;
00180     }
00181 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00182 #endif  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00183   // Point both the read and write HANDLES to the appropriate socket
00184   // HANDLEs.
00185 
00186   return 0;
00187 }
00188 
00189 int
00190 ACE_Pipe::open (ACE_HANDLE handles[2])
00191 {
00192   ACE_TRACE ("ACE_Pipe::open");
00193 
00194   if (this->open () == -1)
00195     return -1;
00196   else
00197     {
00198       handles[0] = this->handles_[0];
00199       handles[1] = this->handles_[1];
00200       return 0;
00201     }
00202 }
00203 
00204 // Do nothing...
00205 
00206 ACE_Pipe::ACE_Pipe (void)
00207 {
00208   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00209 
00210   this->handles_[0] = ACE_INVALID_HANDLE;
00211   this->handles_[1] = ACE_INVALID_HANDLE;
00212 }
00213 
00214 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00215 {
00216   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00217 
00218   if (this->open (handles) == -1)
00219     ACE_ERROR ((LM_ERROR,
00220                 ACE_LIB_TEXT ("ACE_Pipe::ACE_Pipe")));
00221 }
00222 
00223 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00224                     ACE_HANDLE write)
00225 {
00226   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00227   this->handles_[0] = read;
00228   this->handles_[1] = write;
00229 }
00230 
00231 int
00232 ACE_Pipe::close (void)
00233 {
00234   ACE_TRACE ("ACE_Pipe::close");
00235 
00236   int result = 0;
00237 
00238   // Note that the following will work even if we aren't closing down
00239   // sockets because <ACE_OS::closesocket> will just call <::close> in
00240   // that case!
00241 
00242   if (this->handles_[0] != ACE_INVALID_HANDLE)
00243     result = ACE_OS::closesocket (this->handles_[0]);
00244   this->handles_[0] = ACE_INVALID_HANDLE;
00245 
00246   if (this->handles_[1] != ACE_INVALID_HANDLE)
00247     result |= ACE_OS::closesocket (this->handles_[1]);
00248   this->handles_[1] = ACE_INVALID_HANDLE;
00249 
00250   return result;
00251 }
00252 
00253 // Send N char *ptrs and int lengths.  Note that the char *'s precede
00254 // the ints (basically, an varargs version of writev).  The count N is
00255 // the *total* number of trailing arguments, *not* a couple of the
00256 // number of tuple pairs!
00257 
00258 ssize_t
00259 ACE_Pipe::send (size_t n, ...) const
00260 {
00261   ACE_TRACE ("ACE_Pipe::send");
00262   va_list argp;
00263   size_t total_tuples = n / 2;
00264   iovec *iovp;
00265 #if defined (ACE_HAS_ALLOCA)
00266   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00267 #else
00268   ACE_NEW_RETURN (iovp,
00269                   iovec[total_tuples],
00270                   -1);
00271 #endif /* !defined (ACE_HAS_ALLOCA) */
00272 
00273   va_start (argp, n);
00274 
00275   for (size_t i = 0; i < total_tuples; i++)
00276     {
00277       iovp[i].iov_base = va_arg (argp, char *);
00278       iovp[i].iov_len  = va_arg (argp, int);
00279     }
00280 
00281 #if defined (ACE_WIN32)
00282   ssize_t result = ACE::sendv (this->write_handle (),
00283                                iovp,
00284                                total_tuples);
00285 #else
00286   ssize_t result = ACE_OS::writev (this->write_handle (),
00287                                    iovp,
00288                                    total_tuples);
00289 #endif /* ACE_WIN32 */
00290 
00291 #if !defined (ACE_HAS_ALLOCA)
00292   delete [] iovp;
00293 #endif /* !defined (ACE_HAS_ALLOCA) */
00294   va_end (argp);
00295   return result;
00296 }
00297 
00298 // This is basically an interface to ACE_OS::readv, that doesn't use
00299 // the struct iovec explicitly.  The ... can be passed as an arbitrary
00300 // number of (char *ptr, int len) tuples.  However, the count N is the
00301 // *total* number of trailing arguments, *not* a couple of the number
00302 // of tuple pairs!
00303 
00304 ssize_t
00305 ACE_Pipe::recv (size_t n, ...) const
00306 {
00307   ACE_TRACE ("ACE_Pipe::recv");
00308   va_list argp;
00309   size_t total_tuples = n / 2;
00310   iovec *iovp;
00311 #if defined (ACE_HAS_ALLOCA)
00312   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00313 #else
00314   ACE_NEW_RETURN (iovp,
00315                   iovec[total_tuples],
00316                   -1);
00317 #endif /* !defined (ACE_HAS_ALLOCA) */
00318 
00319   va_start (argp, n);
00320 
00321   for (size_t i = 0; i < total_tuples; i++)
00322     {
00323       iovp[i].iov_base = va_arg (argp, char *);
00324       iovp[i].iov_len  = va_arg (argp, int);
00325     }
00326 
00327 #if defined (ACE_WIN32)
00328   ssize_t result = ACE::recvv (this->read_handle (),
00329                                iovp,
00330                                total_tuples);
00331 #else
00332   ssize_t result = ACE_OS::readv (this->read_handle (),
00333                                   iovp,
00334                                   total_tuples);
00335 #endif /* ACE_WIN32 */
00336 
00337 #if !defined (ACE_HAS_ALLOCA)
00338   delete [] iovp;
00339 #endif /* !defined (ACE_HAS_ALLOCA) */
00340   va_end (argp);
00341   return result;
00342 }
00343 
00344 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 09:41:59 2006 for ACE by doxygen 1.3.6