Skip to content

Commit

Permalink
Clean up interfaces and docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost committed Jan 7, 2022
1 parent a216d71 commit fe4d285
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 124 deletions.
92 changes: 80 additions & 12 deletions s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,42 @@ Snappy blocks/streams can safely be concatenated with S2 blocks and streams.

# Stream Seek Index

S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data.

The index can either be appended to the stream as a skippable block or returned for separate storage.

When the index is appended to a stream it will be skipped by regular decoders,
so the output remains compatible with other decoders.

## Creating an Index

To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
Then the index will be added to the stream when `Close()` is called.

If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.

## Using Indexes

To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.

Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader.

If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

A custom index can be specified which will be used if supplied.
When using a custom index, it will not be read from the input stream.

The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
meaning changes performed to one is reflected in the other.

Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
This can be used for parsing indexes, either separate or in streams.

## Index Format:

Each block is structured as a snappy skippable block, with the chunk ID 0x88.
Each block is structured as a snappy skippable block, with the chunk ID 0x99.

The block can be read from the front, but contains information so it can be read from the back as well.

Expand All @@ -694,41 +727,76 @@ with un-encoded value length of 64 bits, unless other limits are specified.

| Content | Format |
|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| ID, `[1]byte` | Always 0x88. |
| ID, `[1]byte` | Always 0x99. |
| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. |
| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". |
| UncompressedSize, Varint | Total Uncompressed size if known. Should be -1 if unknown. |
| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. |
| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0 |
| Entries, Varint | Number of Entries in index, must be < 65536 and >=0 |
| For Each Entry: (uncompressedOffset, VarInt; compressedOffset, VarInt) | Pairs of uncompressed and compressed offsets. See below how to decode. |
| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. |
| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. |
| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. |
| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. |
| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. |
| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. |
| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. |

For regular streams the uncompressed offsets are fully predictable,
so `HasUncompressedOffsets` allows to specify that compressed blocks all have
exactly `EstBlockSize` bytes of uncompressed content.

Entries *must* be in order, starting with the lowest offset,
and there *must* be no uncompressed offset duplicates.
Entries *may* point to the start of a skippable block,
but it is then not allowed to also have an entry for the next block since
that would give an uncompressed offset duplicate.

There is no requirement for all blocks to be represented in the index.
In fact there is a maximum of 65536 block entries in an index.

The writer can use any method to reduce the number of entries.
An implicit block start at 0,0 can be assumed.

It is strongly recommended adding `UncompressedSize`,
otherwise seeking from end-of-file (tailing for example) will not be possible.

### Decoding entries:

```
// Read Uncompressed entries.
// Each assumes EstBlockSize delta from previous.
for each entry {
uOff = 0
if HasUncompressedOffsets == 1 {
uOff = ReadVarInt // Read value from stream
}
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].UncompressedOffset = uOff
continue
}
// Uncompressed uses previous offset and adds EstBlockSize
entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize
}
// Guess that the first block will be 50% of uncompressed size.
// EstBlockSize is read previously.
// Integer truncating division must be used.
CompressGuess := EstBlockSize / 2
// Read Compressed entries.
// Each assumes CompressGuess delta from previous.
// CompressGuess is adjusted for each value.
for each entry {
uOff = ReadVarInt // Read value from stream
cOff = ReadVarInt // Read value from stream
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].UncompressedOffset = uOff
entry[entryNum].CompressedOffset = cOff
continue
}
// Uncompressed uses previous offset and adds EstBlockSize
entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize
// Compressed uses previous and our estimate.
entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess
Expand Down
48 changes: 22 additions & 26 deletions s2/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ var (

// ErrCantSeek is returned if the stream cannot be seeked.
type ErrCantSeek struct {
reason string
Reason string
}

// Error returns the error as string.
func (e ErrCantSeek) Error() string {
return fmt.Sprintf("s2: Can't seek because %s", e.reason)
return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
}

// DecodedLen returns the length of the decoded block.
Expand Down Expand Up @@ -101,7 +101,6 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
}
nr.paramsOK = true
nr.loadIndex = true
return &nr
}

Expand Down Expand Up @@ -179,8 +178,7 @@ type Reader struct {
readHeader bool
paramsOK bool
snappyFrame bool
loadIndex bool
index *index
index *Index
}

// ensureBufferSize will ensure that the buffer can take at least n bytes.
Expand Down Expand Up @@ -604,15 +602,20 @@ type ReadSeeker struct {
// ReadSeeker will return an io.ReadSeeker compatible version of the reader.
// If 'random' is specified the returned io.Seeker can be used for
// random seeking, otherwise only forward seeking is supported.
// A custom index can be specified which will be used if supplied
func (r *Reader) ReadSeeker(random bool, withIndex []byte) (*ReadSeeker, error) {
// Enabling random seeking requires the original input to support
// the io.Seeker interface.
// A custom index can be specified which will be used if supplied.
// When using a custom index, it will not be read from the input stream.
// The returned ReadSeeker contains a shallow reference to the existing Reader,
// meaning changes performed to one is reflected in the other.
func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
// Read index if provided.
if len(withIndex) != 0 {
if len(index) != 0 {
if r.index == nil {
r.index = &index{}
r.index = &Index{}
}
if _, err := r.index.Load(withIndex); err != nil {
return nil, ErrCantSeek{reason: "loading index returned: " + err.Error()}
if _, err := r.index.Load(index); err != nil {
return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
}
}

Expand All @@ -622,41 +625,34 @@ func (r *Reader) ReadSeeker(random bool, withIndex []byte) (*ReadSeeker, error)
if !random {
return &ReadSeeker{Reader: r}, nil
}
return nil, ErrCantSeek{reason: "input stream isn't seekable"}
return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
}

if r.index != nil {
// Seekable and index, ok...
return &ReadSeeker{Reader: r}, nil
}

if !r.loadIndex {
if !random {
return &ReadSeeker{Reader: r}, nil
}
return nil, ErrCantSeek{reason: "not allowed to read index and none provided"}
}

// Load from stream.
r.index = &index{}
r.index = &Index{}

// Read current position.
pos, err := rs.Seek(0, io.SeekCurrent)
if err != nil {
return nil, ErrCantSeek{reason: "seeking input returned: " + err.Error()}
return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
}
err = r.index.LoadStream(rs)
if err != nil {
if err == ErrUnsupported {
return nil, ErrCantSeek{reason: "input stream does not contain an index"}
return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
}
return nil, ErrCantSeek{reason: "reading index returned: " + err.Error()}
return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
}

// Reset position.
// reset position.
_, err = rs.Seek(pos, io.SeekStart)
if err != nil {
return nil, ErrCantSeek{reason: "seeking input returned: " + err.Error()}
return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
}
return &ReadSeeker{Reader: r}, nil
}
Expand Down Expand Up @@ -705,7 +701,7 @@ func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
}

if offset < 0 {
offset = r.index.totalUncompressed + offset
offset = r.index.TotalUncompressed + offset
}

r.i = r.j // Remove rest of current block.
Expand Down
62 changes: 28 additions & 34 deletions s2/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,7 @@ type Writer struct {
writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
index *index
indexOut func(b []byte)
index Index

// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
Expand Down Expand Up @@ -463,12 +462,7 @@ func (w *Writer) Reset(writer io.Writer) {
w.written = 0
w.writer = writer
w.uncompWritten = 0
if w.indexOut != nil || w.appendIndex {
if w.index == nil {
w.index = &index{}
}
w.index.Reset(w.blockSize)
}
w.index.reset(w.blockSize)

// If we didn't get a writer, stop here.
if writer == nil {
Expand Down Expand Up @@ -502,7 +496,7 @@ func (w *Writer) Reset(writer io.Writer) {
err = io.ErrShortBuffer
}
_ = w.err(err)
w.err(w.index.Add(w.written, input.startOffset))
w.err(w.index.add(w.written, input.startOffset))
w.written += int64(n)
}
}
Expand Down Expand Up @@ -1032,7 +1026,7 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
if n != len(obuf) {
return 0, w.err(io.ErrShortWrite)
}
w.err(w.index.Add(w.written, w.uncompWritten))
w.err(w.index.add(w.written, w.uncompWritten))
w.written += int64(n)
w.uncompWritten += int64(len(uncompressed))

Expand Down Expand Up @@ -1091,34 +1085,45 @@ func (w *Writer) Flush() error {
}

// Close calls Flush and then closes the Writer.
// Calling Close multiple times is ok.
// Calling Close multiple times is ok,
// but calling CloseIndex after this will make it not return the index.
func (w *Writer) Close() error {
_, err := w.closeIndex(w.appendIndex)
return err
}

// CloseIndex calls Close and returns an index on first call.
// This is not required if you are only adding index to a stream.
func (w *Writer) CloseIndex() ([]byte, error) {
return w.closeIndex(false)
}

func (w *Writer) closeIndex(idx bool) ([]byte, error) {
err := w.Flush()
if w.output != nil {
close(w.output)
w.writerWg.Wait()
w.output = nil
}

var index []byte
if w.err(nil) == nil && w.writer != nil {
var index []byte
// Create index.
if w.index != nil {
if idx {
compSize := int64(-1)
if w.pad <= 1 {
compSize = w.written
}
index = w.index.AppendTo(w.ibuf[:0], w.uncompWritten, compSize)
if w.indexOut != nil {
w.indexOut(index)
}
index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
// Count as written for padding.
if w.appendIndex {
w.written += int64(len(index))
}
_, err := w.index.Load(index)
if err != nil {
panic(err)
if true {
_, err := w.index.Load(index)
if err != nil {
panic(err)
}
}
}

Expand All @@ -1132,7 +1137,7 @@ func (w *Writer) Close() error {
add := calcSkippableFrame(w.written, int64(w.pad))
frame, err := skippableFrame(tmp, add, w.randSrc)
if err = w.err(err); err != nil {
return err
return nil, err
}
n, err2 := w.writer.Write(frame)
if err2 == nil && n != len(frame) {
Expand All @@ -1150,9 +1155,9 @@ func (w *Writer) Close() error {
}
err = w.err(errClosed)
if err == errClosed {
return nil
return index, nil
}
return err
return nil, err
}

// calcSkippableFrame will return a total size to be added for written
Expand Down Expand Up @@ -1218,17 +1223,6 @@ func WriterConcurrency(n int) WriterOption {
}
}

// WriterIndexCB will build indexes and call back the function with the index when Close() is called.
func WriterIndexCB(fn func(b []byte)) WriterOption {
return func(w *Writer) error {
if w.indexOut != nil {
return errors.New("only one index callback can be added")
}
w.indexOut = fn
return nil
}
}

// WriterAddIndex will append an index to the end of a stream
// when it is closed.
func WriterAddIndex() WriterOption {
Expand Down
Loading

0 comments on commit fe4d285

Please sign in to comment.