Skip to content

Commit

Permalink
s2: Add example for indexing and existing stream (#723)
Browse files Browse the repository at this point in the history
Also allow seek after EOF.
  • Loading branch information
klauspost authored Jan 2, 2023
1 parent 053e2a6 commit 8b191e4
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 1 deletion.
6 changes: 5 additions & 1 deletion s2/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,11 @@ func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
// Seek allows seeking in compressed data.
func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
if r.err != nil {
return 0, r.err
if !errors.Is(r.err, io.EOF) {
return 0, r.err
}
// Reset on EOF
r.err = nil
}
if offset == 0 && whence == io.SeekCurrent {
return r.blockStart + int64(r.i), nil
Expand Down
174 changes: 174 additions & 0 deletions s2/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math/rand"
"os"
"sync"
"testing"

Expand Down Expand Up @@ -234,3 +235,176 @@ func TestSeeking(t *testing.T) {
})
}
}

// ExampleIndexStream shows an example of indexing a stream
// and indexing it after it has been written.
// The index can either be appended.
func ExampleIndexStream() {
fatalErr := func(err error) {
if err != nil {
panic(err)
}
}

// Create a test stream without index
var streamName = ""
tmp := make([]byte, 5<<20)
{
rng := rand.New(rand.NewSource(0xbeefcafe))
rng.Read(tmp)
// Make it compressible...
for i, v := range tmp {
tmp[i] = '0' + v&3
}
// Compress it...
output, err := os.CreateTemp("", "IndexStream")
streamName = output.Name()
fatalErr(err)

// We use smaller blocks just for the example...
enc := s2.NewWriter(output, s2.WriterSnappyCompat())
err = enc.EncodeBuffer(tmp)
fatalErr(err)

// Close and get index...
err = enc.Close()
fatalErr(err)
err = output.Close()
fatalErr(err)
}

// Open our compressed stream without an index...
stream, err := os.Open(streamName)
fatalErr(err)
defer stream.Close()

var indexInput = io.Reader(stream)
var indexOutput io.Writer
var indexedName string

// Should index be combined with stream by appending?
// This could also be done by appending to an os.File
// If not it will be written to a separate file.
const combineOutput = false

// Function to easier use defer.
func() {
if combineOutput {
output, err := os.CreateTemp("", "IndexStream-Combined")
fatalErr(err)
defer func() {
fatalErr(output.Close())
if false {
fi, err := os.Stat(output.Name())
fatalErr(err)
fmt.Println("Combined:", fi.Size(), "bytes")
} else {
fmt.Println("Index saved")
}
}()

// Everything read from stream will also be written to output.
indexedName = output.Name()
indexInput = io.TeeReader(stream, output)
indexOutput = output
} else {
output, err := os.CreateTemp("", "IndexStream-Index")
fatalErr(err)
defer func() {
fatalErr(output.Close())
fi, err := os.Stat(output.Name())
fatalErr(err)
if false {
fmt.Println("Index:", fi.Size(), "bytes")
} else {
fmt.Println("Index saved")
}
}()
indexedName = output.Name()
indexOutput = output
}

// Index the input
idx, err := s2.IndexStream(indexInput)
fatalErr(err)

// Write the index
_, err = indexOutput.Write(idx)
fatalErr(err)
}()

if combineOutput {
// Read from combined stream only.
stream, err := os.Open(indexedName)
fatalErr(err)
defer stream.Close()
// Create a reader with the input.
// We assert that the stream is an io.ReadSeeker.
r := s2.NewReader(io.ReadSeeker(stream))

// Request a ReadSeeker with random access.
// This will load the index from the stream.
rs, err := r.ReadSeeker(true, nil)
fatalErr(err)

_, err = rs.Seek(-10, io.SeekEnd)
fatalErr(err)

b, err := io.ReadAll(rs)
fatalErr(err)
if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("last 10 bytes read")

_, err = rs.Seek(10, io.SeekStart)
fatalErr(err)
_, err = io.ReadFull(rs, b)
fatalErr(err)
if want := tmp[10:20]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("10 bytes at offset 10 read")
} else {
// Read from separate stream and index.
stream, err := os.Open(streamName)
fatalErr(err)
defer stream.Close()
// Create a reader with the input.
// We assert that the stream is an io.ReadSeeker.
r := s2.NewReader(io.ReadSeeker(stream))

// Read the separate index.
index, err := os.ReadFile(indexedName)
fatalErr(err)

// Request a ReadSeeker with random access.
// The provided index will be used.
rs, err := r.ReadSeeker(true, index)
fatalErr(err)

_, err = rs.Seek(-10, io.SeekEnd)
fatalErr(err)

b, err := io.ReadAll(rs)
fatalErr(err)
if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("last 10 bytes read")

_, err = rs.Seek(10, io.SeekStart)
fatalErr(err)
_, err = io.ReadFull(rs, b)
fatalErr(err)
if want := tmp[10:20]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("10 bytes at offset 10 read")
}

// OUTPUT:
// Index saved
// last 10 bytes read
// 10 bytes at offset 10 read
}

0 comments on commit 8b191e4

Please sign in to comment.