Skip to content

read data written with partial flush #996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions flate/inflate.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ const (
huffmanGenericReader
)

// flushMode tells decompressor when to return data
type flushMode uint8

const (
syncFlush flushMode = iota // return data after sync flush block
partialFlush // return data after each block
)

// Decompress state.
type decompressor struct {
// Input source.
Expand Down Expand Up @@ -332,6 +340,8 @@ type decompressor struct {

nb uint
final bool

flushMode flushMode
}

func (f *decompressor) nextBlock() {
Expand Down Expand Up @@ -618,7 +628,10 @@ func (f *decompressor) dataBlock() {
}

if n == 0 {
f.toRead = f.dict.readFlush()
if f.flushMode == syncFlush {
f.toRead = f.dict.readFlush()
}

f.finishBlock()
return
}
Expand Down Expand Up @@ -657,8 +670,12 @@ func (f *decompressor) finishBlock() {
if f.dict.availRead() > 0 {
f.toRead = f.dict.readFlush()
}

f.err = io.EOF
} else if f.flushMode == partialFlush && f.dict.availRead() > 0 {
f.toRead = f.dict.readFlush()
}

f.step = nextBlock
}

Expand Down Expand Up @@ -789,15 +806,25 @@ func (f *decompressor) Reset(r io.Reader, dict []byte) error {
return nil
}

// NewReader returns a new ReadCloser that can be used
// to read the uncompressed version of r.
// If r does not also implement io.ByteReader,
// the decompressor may read more data than necessary from r.
// It is the caller's responsibility to call Close on the ReadCloser
// when finished reading.
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReader(r io.Reader) io.ReadCloser {
type ReaderOpt func(*decompressor)

// WithPartialBlock tells decompressor to return after each block,
// so it can read data written with partial flush
func WithPartialBlock() ReaderOpt {
return func(f *decompressor) {
f.flushMode = partialFlush
}
}

// WithDict initializes the reader with a preset dictionary
func WithDict(dict []byte) ReaderOpt {
return func(f *decompressor) {
f.dict.init(maxMatchOffset, dict)
}
}

// NewReaderOpts returns new reader with provided options
func NewReaderOpts(r io.Reader, opts ...ReaderOpt) io.ReadCloser {
fixedHuffmanDecoderInit()

var f decompressor
Expand All @@ -806,9 +833,26 @@ func NewReader(r io.Reader) io.ReadCloser {
f.codebits = new([numCodes]int)
f.step = nextBlock
f.dict.init(maxMatchOffset, nil)

for _, opt := range opts {
opt(&f)
}

return &f
}

// NewReader returns a new ReadCloser that can be used
// to read the uncompressed version of r.
// If r does not also implement io.ByteReader,
// the decompressor may read more data than necessary from r.
// It is the caller's responsibility to call Close on the ReadCloser
// when finished reading.
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReader(r io.Reader) io.ReadCloser {
return NewReaderOpts(r)
}

// NewReaderDict is like NewReader but initializes the reader
// with a preset dictionary. The returned Reader behaves as if
// the uncompressed data stream started with the given dictionary,
Expand All @@ -817,13 +861,5 @@ func NewReader(r io.Reader) io.ReadCloser {
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser {
fixedHuffmanDecoderInit()

var f decompressor
f.r = makeReader(r)
f.bits = new([maxNumLit + maxNumDist]int)
f.codebits = new([numCodes]int)
f.step = nextBlock
f.dict.init(maxMatchOffset, dict)
return &f
return NewReaderOpts(r, WithDict(dict))
}
21 changes: 21 additions & 0 deletions flate/inflate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"crypto/rand"
"io"
"os"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -279,3 +280,23 @@ func TestWriteTo(t *testing.T) {
t.Fatal("output did not match input")
}
}

func TestReaderPartialBlock(t *testing.T) {
data, err := os.ReadFile("testdata/partial-block")
if err != nil {
t.Error(err)
}

r := NewReaderOpts(bytes.NewReader(data), WithPartialBlock())
rb := make([]byte, 32)
n, err := r.Read(rb)
if err != nil {
t.Fatalf("Read: %v", err)
}

expected := "hello, world"
actual := string(rb[:n])
if expected != actual {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
1 change: 1 addition & 0 deletions flate/testdata/partial-block
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ÊHÍÉÉ×Q(Ï/ÊI
Loading