Pipe.cpp

Go to the documentation of this file.
00001 // $Id: Pipe.cpp 79134 2007-07-31 18:23:50Z johnnyw $
00002 
00003 #include "ace/Pipe.h"
00004 #include "ace/SOCK_Acceptor.h"
00005 #include "ace/SOCK_Connector.h"
00006 #include "ace/Log_Msg.h"
00007 #include "ace/OS_NS_sys_socket.h"
00008 #include "ace/OS_Memory.h"
00009 #include "ace/Truncate.h"
00010 
00011 #if defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00012 #  include "ace/OS_NS_unistd.h"
00013 #endif  // ACE_HAS_STREAM_PIPES || __QNX__
00014 
00015 #include "ace/os_include/netinet/os_tcp.h"
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Pipe.inl"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 ACE_RCSID(ace, Pipe, "$Id: Pipe.cpp 79134 2007-07-31 18:23:50Z johnnyw $")
00022 
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 void
00026 ACE_Pipe::dump (void) const
00027 {
00028 #if defined (ACE_HAS_DUMP)
00029   ACE_TRACE ("ACE_Pipe::dump");
00030   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00031   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handles_[0] = %d"), this->handles_[0]));
00032   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\nhandles_[1] = %d"), this->handles_[1]));
00033   ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\n")));
00034   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00035 #endif /* ACE_HAS_DUMP */
00036 }
00037 
00038 int
00039 ACE_Pipe::open (int buffer_size)
00040 {
00041   ACE_TRACE ("ACE_Pipe::open");
00042 
00043 #if defined (ACE_LACKS_SOCKETPAIR) || defined (__Lynx__)
00044   ACE_INET_Addr my_addr;
00045   ACE_SOCK_Acceptor acceptor;
00046   ACE_SOCK_Connector connector;
00047   ACE_SOCK_Stream reader;
00048   ACE_SOCK_Stream writer;
00049   int result = 0;
00050 # if defined (ACE_WIN32)
00051   ACE_INET_Addr local_any  (static_cast<u_short> (0), ACE_LOCALHOST);
00052 # else
00053   ACE_Addr local_any = ACE_Addr::sap_any;
00054 # endif /* ACE_WIN32 */
00055 
00056   // Bind listener to any port and then find out what the port was.
00057   if (acceptor.open (local_any) == -1
00058       || acceptor.get_local_addr (my_addr) == -1)
00059     result = -1;
00060   else
00061     {
00062       ACE_INET_Addr sv_addr (my_addr.get_port_number (),
00063                              ACE_LOCALHOST);
00064 
00065       // Establish a connection within the same process.
00066       if (connector.connect (writer, sv_addr) == -1)
00067         result = -1;
00068       else if (acceptor.accept (reader) == -1)
00069         {
00070           writer.close ();
00071           result = -1;
00072         }
00073     }
00074 
00075   // Close down the acceptor endpoint since we don't need it anymore.
00076   acceptor.close ();
00077   if (result == -1)
00078     return -1;
00079 
00080   this->handles_[0] = reader.get_handle ();
00081   this->handles_[1] = writer.get_handle ();
00082 
00083 # if !defined (ACE_LACKS_TCP_NODELAY)
00084   int one = 1;
00085 
00086   // Make sure that the TCP stack doesn't try to buffer small writes.
00087   // Since this communication is purely local to the host it doesn't
00088   // affect network performance.
00089 
00090   if (writer.set_option (ACE_IPPROTO_TCP,
00091                          TCP_NODELAY,
00092                          &one,
00093                          sizeof one) == -1)
00094     {
00095       this->close ();
00096       return -1;
00097     }
00098 # endif /* ! ACE_LACKS_TCP_NODELAY */
00099 
00100 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00101     ACE_UNUSED_ARG (buffer_size);
00102 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00103   if (reader.set_option (SOL_SOCKET,
00104                          SO_RCVBUF,
00105                          reinterpret_cast <void *> (&buffer_size),
00106                          sizeof (buffer_size)) == -1
00107       && errno != ENOTSUP)
00108     {
00109       this->close ();
00110       return -1;
00111     }
00112   else if (writer.set_option (SOL_SOCKET,
00113                               SO_SNDBUF,
00114                               reinterpret_cast <void *> (&buffer_size),
00115                               sizeof (buffer_size)) == -1
00116            && errno != ENOTSUP)
00117     {
00118       this->close ();
00119       return -1;
00120     }
00121 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00122 
00123 #elif defined (ACE_HAS_STREAM_PIPES) || defined (__QNX__)
00124   ACE_UNUSED_ARG (buffer_size);
00125   if (ACE_OS::pipe (this->handles_) == -1)
00126     ACE_ERROR_RETURN ((LM_ERROR,
00127                        ACE_TEXT ("%p\n"),
00128                        ACE_TEXT ("pipe")),
00129                       -1);
00130 
00131 #if !defined(__QNX__)
00132   int arg = RMSGN;
00133 
00134   // Enable "msg no discard" mode, which ensures that record
00135   // boundaries are maintained when messages are sent and received.
00136   if (ACE_OS::ioctl (this->handles_[0],
00137                      I_SRDOPT,
00138                      (void *) arg) == -1
00139       || ACE_OS::ioctl (this->handles_[1],
00140                         I_SRDOPT,
00141                         (void *) arg) == -1)
00142     {
00143       this->close ();
00144       ACE_ERROR_RETURN ((LM_ERROR,
00145                          ACE_TEXT ("%p\n"),
00146                          ACE_TEXT ("ioctl")), -1);
00147     }
00148 #endif /* __QNX__ */
00149 
00150 #else  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00151   if (ACE_OS::socketpair (AF_UNIX,
00152                           SOCK_STREAM,
00153                           0,
00154                           this->handles_) == -1)
00155     ACE_ERROR_RETURN ((LM_ERROR,
00156                        ACE_TEXT ("%p\n"),
00157                        ACE_TEXT ("socketpair")),
00158                       -1);
00159 # if defined (ACE_LACKS_SOCKET_BUFSIZ)
00160   ACE_UNUSED_ARG (buffer_size);
00161 # else  /* ! ACE_LACKS_SOCKET_BUFSIZ */
00162   if (ACE_OS::setsockopt (this->handles_[0],
00163                           SOL_SOCKET,
00164                           SO_RCVBUF,
00165                           reinterpret_cast <const char *> (&buffer_size),
00166                           sizeof (buffer_size)) == -1
00167       && errno != ENOTSUP)
00168     {
00169       this->close ();
00170       return -1;
00171     }
00172   if (ACE_OS::setsockopt (this->handles_[1],
00173                           SOL_SOCKET,
00174                           SO_SNDBUF,
00175                           reinterpret_cast <const char *> (&buffer_size),
00176                           sizeof (buffer_size)) == -1
00177       && errno != ENOTSUP)
00178     {
00179       this->close ();
00180       return -1;
00181     }
00182 # endif /* ! ACE_LACKS_SOCKET_BUFSIZ */
00183 #endif  /* ! ACE_LACKS_SOCKETPAIR && ! ACE_HAS_STREAM_PIPES */
00184   // Point both the read and write HANDLES to the appropriate socket
00185   // HANDLEs.
00186 
00187   return 0;
00188 }
00189 
00190 int
00191 ACE_Pipe::open (ACE_HANDLE handles[2])
00192 {
00193   ACE_TRACE ("ACE_Pipe::open");
00194 
00195   if (this->open () == -1)
00196     return -1;
00197   else
00198     {
00199       handles[0] = this->handles_[0];
00200       handles[1] = this->handles_[1];
00201       return 0;
00202     }
00203 }
00204 
00205 // Do nothing...
00206 
00207 ACE_Pipe::ACE_Pipe (void)
00208 {
00209   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00210 
00211   this->handles_[0] = ACE_INVALID_HANDLE;
00212   this->handles_[1] = ACE_INVALID_HANDLE;
00213 }
00214 
00215 ACE_Pipe::ACE_Pipe (ACE_HANDLE handles[2])
00216 {
00217   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00218 
00219   if (this->open (handles) == -1)
00220     ACE_ERROR ((LM_ERROR,
00221                 ACE_TEXT ("ACE_Pipe::ACE_Pipe")));
00222 }
00223 
00224 ACE_Pipe::ACE_Pipe (ACE_HANDLE read,
00225                     ACE_HANDLE write)
00226 {
00227   ACE_TRACE ("ACE_Pipe::ACE_Pipe");
00228   this->handles_[0] = read;
00229   this->handles_[1] = write;
00230 }
00231 
00232 int
00233 ACE_Pipe::close (void)
00234 {
00235   ACE_TRACE ("ACE_Pipe::close");
00236 
00237   int result = 0;
00238 
00239   // Note that the following will work even if we aren't closing down
00240   // sockets because <ACE_OS::closesocket> will just call <::close> in
00241   // that case!
00242 
00243   if (this->handles_[0] != ACE_INVALID_HANDLE)
00244     result = ACE_OS::closesocket (this->handles_[0]);
00245   this->handles_[0] = ACE_INVALID_HANDLE;
00246 
00247   if (this->handles_[1] != ACE_INVALID_HANDLE)
00248     result |= ACE_OS::closesocket (this->handles_[1]);
00249   this->handles_[1] = ACE_INVALID_HANDLE;
00250 
00251   return result;
00252 }
00253 
00254 // Send N char *ptrs and int lengths.  Note that the char *'s precede
00255 // the ints (basically, an varargs version of writev).  The count N is
00256 // the *total* number of trailing arguments, *not* a couple of the
00257 // number of tuple pairs!
00258 
00259 ssize_t
00260 ACE_Pipe::send (size_t n, ...) const
00261 {
00262   ACE_TRACE ("ACE_Pipe::send");
00263   va_list argp;
00264   int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00265   iovec *iovp;
00266 #if defined (ACE_HAS_ALLOCA)
00267   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00268 #else
00269   ACE_NEW_RETURN (iovp,
00270                   iovec[total_tuples],
00271                   -1);
00272 #endif /* !defined (ACE_HAS_ALLOCA) */
00273 
00274   va_start (argp, n);
00275 
00276   for (int i = 0; i < total_tuples; ++i)
00277     {
00278       iovp[i].iov_base = va_arg (argp, char *);
00279       iovp[i].iov_len  = va_arg (argp, int);
00280     }
00281 
00282 #if defined (ACE_WIN32)
00283   ssize_t result = ACE::sendv (this->write_handle (),
00284                                iovp,
00285                                total_tuples);
00286 #else
00287   ssize_t result = ACE_OS::writev (this->write_handle (),
00288                                    iovp,
00289                                    total_tuples);
00290 #endif /* ACE_WIN32 */
00291 
00292 #if !defined (ACE_HAS_ALLOCA)
00293   delete [] iovp;
00294 #endif /* !defined (ACE_HAS_ALLOCA) */
00295   va_end (argp);
00296   return result;
00297 }
00298 
00299 // This is basically an interface to ACE_OS::readv, that doesn't use
00300 // the struct iovec explicitly.  The ... can be passed as an arbitrary
00301 // number of (char *ptr, int len) tuples.  However, the count N is the
00302 // *total* number of trailing arguments, *not* a couple of the number
00303 // of tuple pairs!
00304 
00305 ssize_t
00306 ACE_Pipe::recv (size_t n, ...) const
00307 {
00308   ACE_TRACE ("ACE_Pipe::recv");
00309   va_list argp;
00310   int total_tuples = ACE_Utils::truncate_cast<int> (n / 2);
00311   iovec *iovp;
00312 #if defined (ACE_HAS_ALLOCA)
00313   iovp = (iovec *) alloca (total_tuples * sizeof (iovec));
00314 #else
00315   ACE_NEW_RETURN (iovp,
00316                   iovec[total_tuples],
00317                   -1);
00318 #endif /* !defined (ACE_HAS_ALLOCA) */
00319 
00320   va_start (argp, n);
00321 
00322   for (int i = 0; i < total_tuples; ++i)
00323     {
00324       iovp[i].iov_base = va_arg (argp, char *);
00325       iovp[i].iov_len  = va_arg (argp, int);
00326     }
00327 
00328 #if defined (ACE_WIN32)
00329   ssize_t result = ACE::recvv (this->read_handle (),
00330                                iovp,
00331                                total_tuples);
00332 #else
00333   ssize_t result = ACE_OS::readv (this->read_handle (),
00334                                   iovp,
00335                                   total_tuples);
00336 #endif /* ACE_WIN32 */
00337 
00338 #if !defined (ACE_HAS_ALLOCA)
00339   delete [] iovp;
00340 #endif /* !defined (ACE_HAS_ALLOCA) */
00341   va_end (argp);
00342   return result;
00343 }
00344 
00345 ACE_END_VERSIONED_NAMESPACE_DECL

Generated on Sun Jan 27 12:05:34 2008 for ACE by doxygen 1.3.6