Skip to content

Commit

Permalink
Add new convenience methods on Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
awnumar committed Mar 11, 2020
1 parent d0b79d8 commit ba09d42
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
26 changes: 18 additions & 8 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ func (s *Stream) Read(buf []byte) (int, error) {
s.Lock()
defer s.Unlock()

// Pop data from the front of the list.
e := s.pop()
if e == nil {
return 0, io.EOF
}

// Decrypt the data into a guarded allocation.
b, err := e.Open()
// Grab the next chunk of data from the stream.
b, err := s.Next()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -122,6 +116,22 @@ func (s *Stream) Size() int {
return n
}

// Next grabs the next chunk of data from the Stream and returns it decrypted inside a LockedBuffer. Any error from the stream is forwarded.
func (s *Stream) Next() (*LockedBuffer, error) {
// Pop data from the front of the list.
e := s.pop()
if e == nil {
return newNullBuffer(), io.EOF
}

// Decrypt the data into a guarded allocation.
b, err := e.Open()
if err != nil {
return newNullBuffer(), err
}
return b, nil
}

// Flush reads all of the data from a Stream and returns it inside a LockedBuffer. If an error is encountered before all the data could be read, it is returned along with any data read up until that point.
func (s *Stream) Flush() (*LockedBuffer, error) {
return NewBufferFromEntireReader(s)
Expand Down
38 changes: 35 additions & 3 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ func read(t *testing.T, s *Stream, ref []byte, expectedErr error) {
}
}

func TestFlush(t *testing.T) {
s := NewStream()

size := 2*os.Getpagesize() + 1024
b := make([]byte, size)
ScrambleBytes(b)
ref := make([]byte, len(b))
copy(ref, b)
write(t, s, b)

c, err := s.Flush()
if err != nil {
t.Error(err)
}
if c.Size() != size {
t.Error("unexpected length:", c.Size())
}
if !c.EqualTo(ref) {
t.Error("incorrect data")
}
c.Destroy()
}

func TestStreamReadWrite(t *testing.T) {
// Create new stream object.
s := NewStream()
Expand Down Expand Up @@ -107,7 +130,10 @@ func TestStreamingSanity(t *testing.T) {
write(t, s, b)

// read it back exactly
c := NewBufferFromReader(s, size)
c, err := NewBufferFromReader(s, size)
if err != nil {
t.Error(err)
}
if c.Size() != size {
t.Error("not enough data read back")
}
Expand All @@ -124,7 +150,10 @@ func TestStreamingSanity(t *testing.T) {
write(t, s, b)

// read it all back
c = NewBufferFromEntireReader(s)
c, err = NewBufferFromEntireReader(s)
if err != nil {
t.Error(err)
}
if c.Size() != size {
t.Error("not enough data read back")
}
Expand All @@ -143,7 +172,10 @@ func TestStreamingSanity(t *testing.T) {
write(t, s, b)

// read it back until the delimiter
c = NewBufferFromReaderUntil(s, 'x')
c, err = NewBufferFromReaderUntil(s, 'x')
if err != nil {
t.Error(err)
}
if c.Size() != size-1 {
t.Error("not enough data read back:", c.Size(), "want", size-1)
}
Expand Down

0 comments on commit ba09d42

Please sign in to comment.