Public Member Functions | Protected Member Functions | Protected Attributes

ACE_Streambuf Class Reference

Create your custom streambuf by providing and ACE_*_Stream object to this template. I have tested it with ACE_SOCK_Stream and it should work fine for others as well. More...

#include <IOStream.h>

Inheritance diagram for ACE_Streambuf:
Inheritance graph
[legend]
Collaboration diagram for ACE_Streambuf:
Collaboration graph
[legend]

List of all members.

Public Member Functions

virtual ~ACE_Streambuf (void)
ACE_Time_Valuerecv_timeout (ACE_Time_Value *tv=0)
 Get the current Time_Value pointer and provide a new one.
char * reset_put_buffer (char *newBuffer=0, u_int _streambuf_size=0, u_int _pptr=0)
u_int put_avail (void)
char * reset_get_buffer (char *newBuffer=0, u_int _streambuf_size=0, u_int _gptr=0, u_int _egptr=0)
u_int get_waiting (void)
u_int get_avail (void)
u_int streambuf_size (void)
 Query the streambuf for the size of its buffers.
u_char timeout (void)

Protected Member Functions

 ACE_Streambuf (u_int streambuf_size, int io_mode)
virtual int sync (void)
virtual int underflow (void)
virtual int overflow (int c=EOF)
void reset_base (void)
int syncin (void)
int syncout (void)
int flushbuf (void)
int fillbuf (void)
virtual int get_one_byte (void)
virtual ssize_t send (char *buf, ssize_t len)=0
virtual ssize_t recv (char *buf, ssize_t len, ACE_Time_Value *tv=0)=0
virtual ssize_t recv (char *buf, ssize_t len, int flags, ACE_Time_Value *tv=0)=0
virtual ssize_t recv_n (char *buf, ssize_t len, int flags=0, ACE_Time_Value *tv=0)=0
virtual ACE_HANDLE get_handle (void)

Protected Attributes

char * eback_saved_
char * gptr_saved_
char * egptr_saved_
char * pbase_saved_
char * pptr_saved_
char * epptr_saved_
u_char cur_mode_
const u_char get_mode_
const u_char put_mode_
int mode_
const u_int streambuf_size_
u_char timeout_
 Did we take an error because of an IO operation timeout?
ACE_Time_Value recv_timeout_value_
ACE_Time_Valuerecv_timeout_

Detailed Description

Create your custom streambuf by providing and ACE_*_Stream object to this template. I have tested it with ACE_SOCK_Stream and it should work fine for others as well.

For any iostream object, the real work is done by the underlying streambuf class. That is what we create here. A streambuf has an internal buffer area into which data is read and written as the iostream requests and provides data. At some point during the read process, the iostream will realize that the streambuf has no more data. The underflow function of the streambuf is then called. Likewise, during the write process, the iostream will eventually notice that the streabuf's buffer has become full and will invoke the overflow function. The empty/full state of the read/write "buffers" are controled by two sets pointers. One set is dedicated to read, the other to write. These pointers, in turn, reference a common buffer that is to be shared by both read and write operations. It is this common buffer to which data is written and from which it is read. The common buffer is used by functions of the streambuf as well as the iostream. Because of this and the fact that it is "shared" by both read and write operators, there is a danger of data corruption if read and write operations are allowed to take place "at the same time". To prevent data corruption, we manipulate the read and write pointer sets so that the streambuf is in either a read-mode or write-mode at all times and can never be in both modes at the same time. In the constructor: set the read and write sets to NULL This causes the underflow or overflow operators to be invoked at the first IO activity of the iostream. In the underflow function we arrange for the common buffer to reference our read buffer and for the write pointer set to be disabled. If a write operation is performed by the iostream this will cause the overflow function to be invoked. In the overflow function we arrange for the common buffer to reference our write buffer and for the read pointer set to be disabled. This causes the underflow function to be invoked when the iostream "changes our mode". The overflow function will also invoke the send_n function to flush the buffered data to our peer. Similarly, the sync and syncout functions will cause send_n to be invoked to send the data. Since socket's and the like do not support seeking, there can be no method for "syncing" the input. However, since we maintain separate read/write buffers, no data is lost by "syncing" the input. It simply remains buffered.

Definition at line 162 of file IOStream.h.


Constructor & Destructor Documentation

ACE_Streambuf::~ACE_Streambuf ( void   )  [virtual]

If the default allocation strategey were used the common buffer would be deleted when the object destructs. Since we are providing separate read/write buffers, it is up to us to manage their memory.

Definition at line 649 of file IOStream.cpp.

{
  delete [] this->eback_saved_;
  delete [] this->pbase_saved_;
}

ACE_Streambuf::ACE_Streambuf ( u_int  streambuf_size,
int  io_mode 
) [protected]

Definition at line 478 of file IOStream.cpp.

  : eback_saved_ (0),  // to avoid Purify UMR
    pbase_saved_ (0),  // to avoid Purify UMR
    get_mode_ (1),
    put_mode_ (2),
    mode_ (io_mode),
    streambuf_size_ (streambuf_size),
    recv_timeout_ (0)
{
 (void)reset_get_buffer ();
 (void)reset_put_buffer ();
}


Member Function Documentation

int ACE_Streambuf::fillbuf ( void   )  [protected]

fillbuf is called in a couple of places. This is the worker of underflow. It will attempt to fill the read buffer from the peer.

Definition at line 446 of file IOStream.cpp.

{
  // Invoke recv_n to get exactly one byte from the remote.  This will
  // block until something shows up.

  if (get_one_byte () == EOF)
    return EOF;

  // Now, get whatever else may be in the buffer.  This will return if
  // there is nothing in the buffer.

  int bc = this->recv (base (), blen (), this->recv_timeout_);

  // recv will give us -1 if there was a problem.  If there was
  // nothing waiting to be read, it will give us 0.  That isn't an
  // error.

  if (bc < 0)
    {
      if (errno == ETIME)
        this->timeout_ = 1;
      return EOF;
    }

  // Move the get pointer to reflect the number of bytes we just read.

  setg (base (), base (), base () + bc);

  // Return the byte-read-count including the one from <get_one_byte>.
  return bc;
}

int ACE_Streambuf::flushbuf ( void   )  [protected]

flushbuf is the worker of syncout. It is a separate function because it gets used sometimes in different context.

Definition at line 365 of file IOStream.cpp.

{
  // pptr () is one character beyond the last character put into the
  // buffer.  pbase () points to the beginning of the put buffer.
  // Unless pptr () is greater than pbase () there is nothing to be
  // sent to the peer.

  if (pptr () <= pbase ())
    return 0;

  // 4/12/97 -- JCEJ
  // Kludge!!!
  // If the remote side shuts down the connection, an attempt to send
  // () to the remote will result in the message 'Broken Pipe' I think
  // this is an OS message, I've tracked it down to the ACE_OS::write
  // () function.  That's the last one to be called before the
  // message.  I can only test this on Linux though, so I don't know
  // how other systems will react.
  //
  // To get around this gracefully, I do a PEEK recv () with an
  // immediate (nearly) timeout.  recv () is much more graceful on
  // it's failure.  If we get -1 from recv () not due to timeout then
  // we know we're SOL.
  //
  // Q:  Is 'errno' threadsafe?  Should the section below be a
  //     critical section?
  //
  // char tbuf[1];
  // ACE_Time_Value to (0,1);
  // if (this->recv (tbuf, 1, MSG_PEEK, &to) == -1)
  // {
  //    if (errno != ETIME)
  //    {
  //            perror ("OOPS preparing to send to peer");
  //            return EOF;
  //    }
  // }
  //
  // The correct way to handle this is for the application to trap
  // (and ignore?) SIGPIPE.  Thanks to Amos Shapira for reminding me
  // of this.

  // Starting at the beginning of the buffer, send as much data as
  // there is waiting.  send guarantees that all of the data will be
  // sent or an error will be returned.

  if (this->send (pbase (), pptr () - pbase ()) == -1)
    return EOF;

  // Now that we've sent everything in the output buffer, we reset the
  // buffer pointers to appear empty.
  setp (base (), ebuf ());

  return 0;
}

u_int ACE_Streambuf::get_avail ( void   ) 

Return the number of bytes in the get area (includes some already gotten); eback + get_avail = egptr

Definition at line 510 of file IOStream.cpp.

{
  return this->egptr_saved_ - this->eback_saved_;
}

ACE_HANDLE ACE_Streambuf::get_handle ( void   )  [protected, virtual]

Reimplemented in ACE_Streambuf_T< STREAM >.

Definition at line 92 of file IOStream.cpp.

{
  return 0;
}

int ACE_Streambuf::get_one_byte ( void   )  [protected, virtual]

Used by fillbuf and others to get exactly one byte from the peer. recv_n is used to be sure we block until something is available. It is virtual because we really need to override it for datagram-derived objects.

Definition at line 422 of file IOStream.cpp.

{
  this->timeout_ = 0;

  // The recv function will return immediately if there is no data
  // waiting.  So, we use recv_n to wait for exactly one byte to come
  // from the peer.  Later, we can use recv to see if there is
  // anything else in the buffer. (Ok, we could use flags to tell it
  // to block but I like this better.)

  if (this->recv_n (base (), 1, MSG_PEEK, this->recv_timeout_) != 1)
    {
      if (errno == ETIME)
        this->timeout_ = 1;
      return EOF;
    }
  else
    return 1;
}

u_int ACE_Streambuf::get_waiting ( void   ) 

Return the number of bytes not yet gotten. eback + get_waiting = gptr

Definition at line 501 of file IOStream.cpp.

{
  return this->gptr_saved_ - this->eback_saved_;
}

int ACE_Streambuf::overflow ( int  c = EOF  )  [protected, virtual]

The overflow function receives the character which caused the overflow.

Definition at line 237 of file IOStream.cpp.

{
  // Check to see if output is allowed at all.
  if (! (mode_ & ios::out))
    return EOF;

  if (!base ())
    {
      // Set base () to use put's private buffer.
      //
      setb (this->pbase_saved_,
            this->pbase_saved_ + streambuf_size_, 0);

      // Set the mode for optimization.
      this->cur_mode_ = this->put_mode_;
      // Set the put area using the new base () values.
      setp (base (), ebuf ());

      // Disable the get area.
      setg (0, 0, 0);
    }
  else  // We're already reading or writing
    {
      // If we're coming out of get mode...
      if (this->cur_mode_ == this->get_mode_)
        {
          // --> JCEJ 6/6/98
          if (! eback())
            {
              /* Something has happened to cause the streambuf
                 to get rid of our get area.
                 We could probably do this a bit cleaner but
                 this method is sure to cleanup the bits and
                 pieces.
              */
              delete [] eback_saved_;
              (void) reset_get_buffer();
            }
          else
            {
              // Save the current get mode values
              this->eback_saved_ = eback ();
              this->gptr_saved_  = gptr ();
              this->egptr_saved_ = egptr ();
            }
          // <-- JCEJ 6/6/98

          // then disable the get buffer
          setg (0, 0, 0);

          // Reconfigure base () and restore the put pointers.
          setb (pbase_saved_, pbase_saved_ + streambuf_size_, 0);
          setp (base (), ebuf ());

          // Save the new mode.
          this->cur_mode_ = this->put_mode_;
        }

      // If there is output to be flushed, do so now.  We shouldn't
      // get here unless this is the case...

      if (out_waiting () && EOF == syncout ())
        return EOF;
    }

  // If we're not putting EOF, then we have to deal with the character
  // that is being put.  Perhaps we should do something special with EOF???

  if (c != EOF)
    {
      // We've already written any data that may have been in the
      // buffer, so we're guaranteed to have room in the buffer for
      // this new information.  So... we add it to the buffer and
      // adjust our 'next' pointer acordingly.
      *pptr () = (char) c;
      pbump (1);
    }

  return 0;
}

u_int ACE_Streambuf::put_avail ( void   ) 

Return the number of bytes to be 'put' onto the stream media. pbase + put_avail = pptr

Definition at line 519 of file IOStream.cpp.

{
  return this->pptr_saved_ - this->pbase_saved_;
}

virtual ssize_t ACE_Streambuf::recv ( char *  buf,
ssize_t  len,
ACE_Time_Value tv = 0 
) [protected, pure virtual]

Implemented in ACE_Streambuf_T< STREAM >.

virtual ssize_t ACE_Streambuf::recv ( char *  buf,
ssize_t  len,
int  flags,
ACE_Time_Value tv = 0 
) [protected, pure virtual]

Implemented in ACE_Streambuf_T< STREAM >.

virtual ssize_t ACE_Streambuf::recv_n ( char *  buf,
ssize_t  len,
int  flags = 0,
ACE_Time_Value tv = 0 
) [protected, pure virtual]

Implemented in ACE_Streambuf_T< STREAM >.

ACE_Time_Value * ACE_Streambuf::recv_timeout ( ACE_Time_Value tv = 0  ) 

Get the current Time_Value pointer and provide a new one.

Definition at line 98 of file IOStream.cpp.

{
  ACE_Time_Value * rval = recv_timeout_;
  if (tv)
    {
      recv_timeout_value_ = *tv;
      recv_timeout_ = &recv_timeout_value_;
    }
  else
    recv_timeout_ = 0;

  return rval;
}

void ACE_Streambuf::reset_base ( void   )  [protected]

Resets the <base> pointer and streambuf mode. This is used internally when get/put buffers are allocatd.

Definition at line 631 of file IOStream.cpp.

{
  // Until we experience the first get or put operation, we do not
  // know what our current IO mode is.
  this->cur_mode_ = 0;

  // The common area used for reading and writting is called "base".
  // We initialize it this way so that the first get/put operation
  // will have to "allocate" base.  This allocation will set base to
  // the appropriate specific buffer and set the mode to the correct
  // value.
  setb (0, 0);
}

char * ACE_Streambuf::reset_get_buffer ( char *  newBuffer = 0,
u_int  _streambuf_size = 0,
u_int  _gptr = 0,
u_int  _egptr = 0 
)

Use this to allocate a new/different buffer for get operations. If you do not provide a buffer pointer, one will be allocated. That is the preferred method. If you do provide a buffer, the size must match that being used by the put buffer. If successful, you will receive a pointer to the current get buffer. It is your responsibility to delete this memory when you are done with it.

Definition at line 535 of file IOStream.cpp.

{
  char * rval = this->eback_saved_;

  // The get area is where the iostream will get data from.  This is
  // our read buffer.  There are three pointers which describe the
  // read buffer:
  //
  //    eback () - The beginning of the buffer.  Also the furthest
  //              point at which putbacks can be done.  Hence the name.
  //
  //    gptr ()  - Where the next character is to be got from.
  //
  //    egptr () - One position beyond the last get-able character.
  //
  // So that we can switch quicky from read to write mode without
  // any data copying, we keep copies of these three pointers in
  // the variables below.  Initially, they all point to the beginning
  // of our read-dedicated buffer.
  //
  if (newBuffer)
    {
      if (streambuf_size_ != _streambuf_size)
        return 0;
      this->eback_saved_ = newBuffer;
    }
  else
    ACE_NEW_RETURN (this->eback_saved_,
                    char[streambuf_size_],
                    0);

  this->gptr_saved_ = this->eback_saved_ + _gptr;
  this->egptr_saved_ = this->eback_saved_ + _egptr;

  // Disable the get area initially.  This will cause underflow to be
  // invoked on the first get operation.
  setg (0, 0, 0);

  reset_base ();

  return rval;
}

char * ACE_Streambuf::reset_put_buffer ( char *  newBuffer = 0,
u_int  _streambuf_size = 0,
u_int  _pptr = 0 
)

Use this to allocate a new/different buffer for put operations. If you do not provide a buffer pointer, one will be allocated. That is the preferred method. If you do provide a buffer, the size must match that being used by the get buffer. If successful, you will receive a pointer to the current put buffer. It is your responsibility to delete this memory when you are done with it.

Definition at line 588 of file IOStream.cpp.

{
  char *rval = this->pbase_saved_;

  // The put area is where the iostream will put data that needs to be
  // sent to the peer.  This becomes our write buffer.  The three
  // pointers which maintain this area are:
  //
  //    pbase () - The beginning of the put area.
  //
  //    pptr ()  - Where the next character is to be put.
  //
  //    epptr () - One beyond the last valid position for putting.
  //
  // Again to switch quickly between modes, we keep copies of
  // these three pointers.
  //
  if (newBuffer)
    {
      if (streambuf_size_ != _streambuf_size)
        return 0;
      this->pbase_saved_ = newBuffer;
    }
  else
    ACE_NEW_RETURN (this->pbase_saved_,
                    char[streambuf_size_],
                    0);

  this->pptr_saved_ = this->pbase_saved_ + _pptr;
  this->epptr_saved_ = this->pbase_saved_ + streambuf_size_;

  // Disable the put area.  Overflow will be called by the first call
  // to any put operator.
  setp (0, 0);

  reset_base ();

  return rval;
}

virtual ssize_t ACE_Streambuf::send ( char *  buf,
ssize_t  len 
) [protected, pure virtual]

Stream connections and "unconnected connections" (ie -- datagrams) need to work just a little differently. We derive custom Streambuf objects for them and provide these functions at that time.

Implemented in ACE_Streambuf_T< STREAM >.

u_int ACE_Streambuf::streambuf_size ( void   ) 

Query the streambuf for the size of its buffers.

Definition at line 492 of file IOStream.cpp.

{
  return streambuf_size_;
}

int ACE_Streambuf::sync ( void   )  [protected, virtual]

Sync both input and output. See syncin/syncout below for descriptions.

Definition at line 345 of file IOStream.cpp.

{
  // sync () is fairly traditional in that it syncs both input and
  // output.  We could have omitted the call to syncin () but someday,
  // we may want it to do something.

  syncin ();

  // Don't bother syncing the output unless there is data to be
  // sent...

  if (out_waiting ())
    return syncout ();
  else
    return 0;
}

int ACE_Streambuf::syncin ( void   )  [protected]

syncin is called when the input needs to be synced with the source file. In a filebuf, this results in the <seek> system call being used. We can't do that on socket-like connections, so this does basically nothing. That's safe because we have a separate read buffer to maintain the already-read data. In a filebuf, the single common buffer is used forcing the <seek> call.

Definition at line 321 of file IOStream.cpp.

{
  // As discussed, there really isn't any way to sync input from a
  // socket-like device.  We specifially override this base-class
  // function so that it won't do anything evil to us.
  return 0;
}

int ACE_Streambuf::syncout ( void   )  [protected]

syncout is called when the output needs to be flushed. This is easily done by calling the peer's send_n function.

Definition at line 332 of file IOStream.cpp.

{
  // Unlike syncin, syncout is a doable thing.  All we have to do is
  // write whatever is in the output buffer to the peer.  flushbuf ()
  // is how we do it.

  if (flushbuf () == EOF)
    return EOF;
  else
    return 0;
}

u_char ACE_Streambuf::timeout ( void   ) 

Did we take an error because of an IO operation timeout?

Note:
Invoking this resets the flag.

Definition at line 655 of file IOStream.cpp.

{
  u_char rval = this->timeout_;
  this->timeout_ = 0;
  return rval;
}

int ACE_Streambuf::underflow ( void   )  [protected, virtual]

Definition at line 113 of file IOStream.cpp.

{
  // If input mode is not set, any attempt to read from the stream is
  // a failure.

  if (ACE_BIT_DISABLED (mode_, ios::in))
    return EOF;

  // If base () is empty then this is the first time any get/put
  // operation has been attempted on the stream.

  if (!this->base ())
    {
      // Set base () to use our private read buffer.  The arguments are:
      //        beginning of the buffer (base ())
      //        one-beyond the end of the buffer (ebase ())
      //        should base () be deleted on destruction
      //
      // We have to say "no" to the third parameter because we want to
      // explicitly handle deletion of the TWO buffers at destruction.

      setb (this->eback_saved_,
            this->eback_saved_ + streambuf_size_, 0);

      // Remember that we are now in getMode.  This will help us if
      // we're called prior to a mode change as well as helping us
      // when the mode does change.
      this->cur_mode_ = this->get_mode_;
      // Using the new values for base (), initialize the get area.
      // This simply sets eback (), gptr () and egptr () described
      // earlier.
      setg (base (), base (), base ());

      // Set the put buffer such that puts will be disabled.  Any
      // attempt to put data will now cause overflow to be invoked.
      setp (0, 0);
    }
  else  // base () has been initialized already...
    {
      // If we are in put_mode_ now, then it is time to switch to get_mode_
      //
      // 1. get rid of any pending output
      // 2. rearrange base () to use our half of the buffer
      // 3. reset the mode
      //
      if (this->cur_mode_ == this->put_mode_)
        {
          // Dump any pending output to the peer.  This is not really
          // necessary because of the dual-buffer arrangement we've
          // set up but intuitively it makes sense to send the pending
          // data before we request data since the peer will probably
          // need what we're sending before it can respond.
          if (out_waiting () && syncout () == EOF)
            return EOF;

          if( ! pbase() )
            {
              delete [] pbase_saved_;
              (void) reset_put_buffer();
            }
          else
            {
              // We're about to disable put mode but before we do
              // that, we want to preserve it's state.
              this->pbase_saved_ = pbase ();
              this->pptr_saved_  = pptr ();
              this->epptr_saved_ = epptr ();
            }

          // Disable put mode as described in the constructor.
          setp (0, 0);

          // Like the case where base () is false, we now point base
          // () to use our private get buffer.
          setb (this->eback_saved_,
                this->eback_saved_ + streambuf_size_,
                0);

          // And restore the previous state of the get pointers.

          setg (this->eback_saved_, this->gptr_saved_,
                this->egptr_saved_);

          // Finally, set our mode so that we don't get back into this
          // if () and so that overflow can operate correctly.
          cur_mode_ = get_mode_;
        }

      // There could be data in the input buffer if we switched to put
      // mode before reading everything.  In that case, we take this
      // opportunity to feed it back to the iostream.
      if (in_avail ())
        // Remember that we return an int so that we can give back
        // EOF.  The explicit cast prevents us from returning a signed
        // char when we're not returning EOF.
        return (u_char) *gptr ();
    }

  // We really shouldn't be here unless there is a lack of data in the
  // read buffer.  So... go get some more data from the peer.

  int result = fillbuf ();

  // Fillbuf will give us EOF if there was an error with the peer.  In
  // that case, we can do no more input.

  if (EOF == result)
    {
      // Disable ourselves and return failure to the iostream.  That
      // should result in a call to have oursleves closed.
      setg (0, 0, 0);
      return EOF;
    }

  // Return the next available character in the input buffer.  Again,
  // we protect against sign extension.

  return (u_char) *gptr ();
}


Member Data Documentation

u_char ACE_Streambuf::cur_mode_ [protected]

Definition at line 254 of file IOStream.h.

char* ACE_Streambuf::eback_saved_ [protected]

Definition at line 244 of file IOStream.h.

char* ACE_Streambuf::egptr_saved_ [protected]

Definition at line 246 of file IOStream.h.

char* ACE_Streambuf::epptr_saved_ [protected]

Definition at line 249 of file IOStream.h.

const u_char ACE_Streambuf::get_mode_ [protected]

Definition at line 255 of file IOStream.h.

char* ACE_Streambuf::gptr_saved_ [protected]

Definition at line 245 of file IOStream.h.

int ACE_Streambuf::mode_ [protected]

mode tells us if we're working for an istream, ostream, or iostream.

Definition at line 260 of file IOStream.h.

char* ACE_Streambuf::pbase_saved_ [protected]

Definition at line 247 of file IOStream.h.

char* ACE_Streambuf::pptr_saved_ [protected]

Definition at line 248 of file IOStream.h.

const u_char ACE_Streambuf::put_mode_ [protected]

Definition at line 256 of file IOStream.h.

Definition at line 272 of file IOStream.h.

We want to allow the user to provide Time_Value pointers to prevent infinite blocking while waiting to receive data.

Definition at line 271 of file IOStream.h.

const u_int ACE_Streambuf::streambuf_size_ [protected]

This defines the size of the input and output buffers. It can be set by the object constructor.

Definition at line 264 of file IOStream.h.

u_char ACE_Streambuf::timeout_ [protected]

Did we take an error because of an IO operation timeout?

Definition at line 267 of file IOStream.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines