Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions internal/utils/buf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"io"

"github.com/apache/arrow-go/v18/arrow/memory"
)

type Reader interface {
Expand All @@ -38,6 +40,7 @@ type byteReader struct {

// NewByteReader creates a new ByteReader instance from the given byte slice.
// It wraps the bytes.NewReader function to implement BufferedReader interface.
// It is considered not to own the underlying byte slice.
func NewByteReader(buf []byte) *byteReader {
r := bytes.NewReader(buf)
return &byteReader{
Expand Down Expand Up @@ -108,10 +111,52 @@ func (r *byteReader) Reset(Reader) {}

func (r *byteReader) BufferSize() int { return len(r.buf) }

func (r *byteReader) Buffered() int { return len(r.buf) - r.pos }

func (r *byteReader) Free() {
r.r = nil
r.buf = nil
r.pos = 0
}

// bytesBufferReader is a byte slice with a bytes reader wrapped around it.
// It uses an allocator to allocate and free the underlying byte slice.
type bytesBufferReader struct {
alloc memory.Allocator
byteReader
}

// NewBytesBufferReader creates a new bytesBufferReader with the given size and allocator.
func NewBytesBufferReader(size int, alloc memory.Allocator) *bytesBufferReader {
if alloc == nil {
alloc = memory.DefaultAllocator
}
buf := alloc.Allocate(size)
return &bytesBufferReader{
alloc: alloc,
byteReader: byteReader{
bytes.NewReader(buf),
buf,
0,
},
}
}

// Outer returns the underlying byte slice.
func (r *bytesBufferReader) Buffer() []byte {
return r.buf
}

// Free releases the underlying byte slice back to the allocator.
func (r *bytesBufferReader) Free() {
r.alloc.Free(r.buf)
}

// bufferedReader is similar to bufio.Reader except
// it will expand the buffer if necessary when asked to Peek
// more bytes than are in the buffer
type bufferedReader struct {
alloc memory.Allocator // allocator used to allocate the buffer
bufferSz int
buf []byte
r, w int
Expand All @@ -122,9 +167,13 @@ type bufferedReader struct {
// NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader
// except Peek will expand the internal buffer if needed rather than return
// an error.
func NewBufferedReader(rd Reader, sz int) *bufferedReader {
func NewBufferedReader(rd Reader, sz int, alloc memory.Allocator) *bufferedReader {
if alloc == nil {
alloc = memory.DefaultAllocator
}
r := &bufferedReader{
rd: rd,
alloc: alloc,
rd: rd,
}
r.resizeBuffer(sz)
return r
Expand All @@ -140,11 +189,9 @@ func (b *bufferedReader) Reset(rd Reader) {

func (b *bufferedReader) resetBuffer() {
if b.buf == nil {
b.buf = make([]byte, b.bufferSz)
b.buf = b.alloc.Allocate(b.bufferSz)
} else if b.bufferSz > cap(b.buf) {
buf := b.buf
b.buf = make([]byte, b.bufferSz)
copy(b.buf, buf)
b.buf = b.alloc.Reallocate(b.bufferSz, b.buf)
} else {
b.buf = b.buf[:b.bufferSz]
}
Expand Down Expand Up @@ -298,3 +345,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
b.r += n
return n, nil
}

func (b *bufferedReader) Free() {
b.alloc.Free(b.buf)
}
Loading