Skip to content

Commit

Permalink
don't copy byte-array values (xitongsys#329)
Browse files Browse the repository at this point in the history
* don't copy byte-array values

* remove unnecessary call to clearValues

* PR feedback
  • Loading branch information
Achille authored Aug 31, 2022
1 parent d179e9d commit 2f8ef0d
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 284 deletions.
238 changes: 41 additions & 197 deletions column_chunk.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package parquet

import (
"errors"
"io"
)

Expand Down Expand Up @@ -40,169 +39,7 @@ type pageAndValueWriter interface {
ValueWriter
}

type columnChunkReader struct {
// These two fields must be configured to initialize the reader.
buffer []Value // buffer holding values read from the pages
offset int // offset of the next value in the buffer
reader Pages // reader of column pages
values ValueReader // reader for values from the current page
page Page // current page
}

func (r *columnChunkReader) buffered() int {
return len(r.buffer) - r.offset
}

func (r *columnChunkReader) reset() {
clearValues(r.buffer)
r.buffer = r.buffer[:0]
r.offset = 0
r.values = nil
unref(r.page)
r.page = nil
}

func (r *columnChunkReader) close() (err error) {
r.reset()
return r.reader.Close()
}

func (r *columnChunkReader) seekToRow(rowIndex int64) error {
// TODO: there are a few optimizations we can make here:
// * is the row buffered already? => advance the offset
// * is the row in the current page? => seek in values
r.reset()
return r.reader.SeekToRow(rowIndex)
}

func (r *columnChunkReader) readValues() error {
if r.offset < len(r.buffer) {
return nil
}
if r.values == nil {
for {
unref(r.page)
r.page = nil
p, err := r.reader.ReadPage()
if err != nil {
return err
}
r.page = p
if p.NumValues() > 0 {
r.values = p.Values()
break
}
}
}
n, err := r.values.ReadValues(r.buffer[:cap(r.buffer)])
if errors.Is(err, io.EOF) {
r.values = nil
}
if n > 0 {
err = nil
}
r.buffer = r.buffer[:n]
r.offset = 0
return err
}

/*
func (r *columnChunkReader) writeBufferedRowsTo(w pageAndValueWriter, rowCount int64) (numRows int64, err error) {
if rowCount == 0 {
return 0, nil
}
for {
for r.offset < len(r.buffer) {
values := r.buffer[r.offset:]
// We can only determine that the full row has been consumed if we
// have more values in the buffer, and the next value is the start
// of a new row. Otherwise, we have to load more values from the
// page, which may yield EOF if all values have been consumed, in
// which case we know that we have read the full row, and otherwise
// we will enter this check again on the next loop iteration.
if numRows == rowCount {
if values[0].repetitionLevel == 0 {
return numRows, nil
}
values, _ = splitRowValues(values)
} else {
values = limitRowValues(values, int(rowCount-numRows))
}
n, err := w.WriteValues(values)
numRows += int64(countRowsOf(values[:n]))
r.offset += n
if err != nil {
return numRows, err
}
}
if err := r.readValuesFromCurrentPage(); err != nil {
if err == io.EOF {
err = nil
}
return numRows, err
}
}
}
func (r *columnChunkReader) writeRowsTo(w pageAndValueWriter, limit int64) (numRows int64, err error) {
for numRows < limit {
if r.values != nil {
n, err := r.writeBufferedRowsTo(w, numRows-limit)
numRows += n
if err != nil || numRows == limit {
return numRows, err
}
}
r.buffer = r.buffer[:0]
r.offset = 0
for numRows < limit {
p, err := r.reader.ReadPage()
if err != nil {
return numRows, err
}
pageRows := int64(p.NumRows())
// When the page is fully contained in the remaining range of rows
// that we intend to copy, we can use an optimized page copy rather
// than writing rows one at a time.
//
// Data pages v1 do not expose the number of rows available, which
// means we cannot take the optimized page copy path in those cases.
if pageRows == 0 || int64(pageRows) > limit {
r.values = p.Values()
err := r.readValuesFromCurrentPage()
if err == nil {
// More values have been buffered, break out of the inner loop
// to go back to the beginning of the outer loop and write
// buffered values to the output.
break
}
if errors.Is(err, io.EOF) {
// The page contained no values? Unclear if this is valid but
// we can handle it by reading the next page.
r.values = nil
continue
}
return numRows, err
}
if _, err := w.WritePage(p); err != nil {
return numRows, err
}
numRows += pageRows
}
}
return numRows, nil
}
*/

type readRowsFunc func([]Row, byte, []columnChunkReader) (int, error)
type readRowsFunc func(*rowGroupRows, []Row, byte) (int, error)

func readRowsFuncOf(node Node, columnIndex int, repetitionDepth byte) (int, readRowsFunc) {
var read readRowsFunc
Expand All @@ -226,7 +63,7 @@ func readRowsFuncOf(node Node, columnIndex int, repetitionDepth byte) (int, read

//go:noinline
func readRowsFuncOfRepeated(read readRowsFunc, repetitionDepth byte) readRowsFunc {
return func(rows []Row, repetitionLevel byte, columns []columnChunkReader) (int, error) {
return func(r *rowGroupRows, rows []Row, repetitionLevel byte) (int, error) {
for i := range rows {
// Repeated columns have variable number of values, we must process
// them one row at a time because we cannot predict how many values
Expand All @@ -235,7 +72,7 @@ func readRowsFuncOfRepeated(read readRowsFunc, repetitionDepth byte) readRowsFun

// The first pass looks for values marking the beginning of a row by
// having a repetition level equal to the current level.
n, err := read(row, repetitionLevel, columns)
n, err := read(r, row, repetitionLevel)
if err != nil {
// The error here may likely be io.EOF, the read function may
// also have successfully read a row, which is indicated by a
Expand All @@ -262,7 +99,7 @@ func readRowsFuncOfRepeated(read readRowsFunc, repetitionDepth byte) readRowsFun
// repeated values until we get the indication that we consumed
// them all (the read function returns zero and no errors).
for {
n, err := read(row, repetitionDepth, columns)
n, err := read(r, row, repetitionDepth)
if err != nil {
return i + 1, err
}
Expand All @@ -280,7 +117,7 @@ func readRowsFuncOfGroup(node Node, columnIndex int, repetitionDepth byte) (int,
fields := node.Fields()

if len(fields) == 0 {
return columnIndex, func(_ []Row, _ byte, _ []columnChunkReader) (int, error) {
return columnIndex, func(*rowGroupRows, []Row, byte) (int, error) {
return 0, io.EOF
}
}
Expand All @@ -298,10 +135,10 @@ func readRowsFuncOfGroup(node Node, columnIndex int, repetitionDepth byte) (int,
columnIndex, group[i] = readRowsFuncOf(fields[i], columnIndex, repetitionDepth)
}

return columnIndex, func(rows []Row, repetitionLevel byte, columns []columnChunkReader) (int, error) {
return columnIndex, func(r *rowGroupRows, rows []Row, repetitionLevel byte) (int, error) {
// When reading a group, we use the first column as an indicator of how
// may rows can be read during this call.
n, err := group[0](rows, repetitionLevel, columns)
n, err := group[0](r, rows, repetitionLevel)

if n > 0 {
// Read values for all rows that the group is able to consume.
Expand All @@ -310,8 +147,8 @@ func readRowsFuncOfGroup(node Node, columnIndex int, repetitionDepth byte) (int,
// be more to read in other columns, therefore we must always read
// all columns and cannot stop on the first error.
for _, read := range group[1:] {
_, err2 := read(rows[:n], repetitionLevel, columns)
if err2 != nil && !errors.Is(err2, io.EOF) {
_, err2 := read(r, rows[:n], repetitionLevel)
if err2 != nil && err2 != io.EOF {
return 0, err2
}
}
Expand All @@ -326,46 +163,53 @@ func readRowsFuncOfLeaf(columnIndex int, repetitionDepth byte) (int, readRowsFun
var read readRowsFunc

if repetitionDepth == 0 {
read = func(rows []Row, _ byte, columns []columnChunkReader) (int, error) {
read = func(r *rowGroupRows, rows []Row, _ byte) (int, error) {
// When the repetition depth is zero, we know that there is exactly
// one value per row for this column, and therefore we can consume
// as many values as there are rows to fill.
col := &columns[columnIndex]

for n := 0; n < len(rows); {
if col.offset < len(col.buffer) {
rows[n] = append(rows[n], col.buffer[col.offset])
n++
col.offset++
continue
}
if err := col.readValues(); err != nil {
return n, err
col := &r.columns[columnIndex]
buf := r.buffer(columnIndex)

for i := range rows {
if col.offset == col.length {
n, err := col.values.ReadValues(buf)
col.offset = 0
col.length = int32(n)
if n == 0 && err != nil {
return 0, err
}
}

rows[i] = append(rows[i], buf[col.offset])
col.offset++
}

return len(rows), nil
}
} else {
read = func(rows []Row, repetitionLevel byte, columns []columnChunkReader) (int, error) {
read = func(r *rowGroupRows, rows []Row, repetitionLevel byte) (int, error) {
// When the repetition depth is not zero, we know that we will be
// called with a single row as input. We attempt to read at most one
// value of a single row and return to the caller.
col := &columns[columnIndex]

for {
if col.offset < len(col.buffer) {
if col.buffer[col.offset].repetitionLevel != repetitionLevel {
return 0, nil
}
rows[0] = append(rows[0], col.buffer[col.offset])
col.offset++
return 1, nil
}
if err := col.readValues(); err != nil {
col := &r.columns[columnIndex]
buf := r.buffer(columnIndex)

if col.offset == col.length {
n, err := col.values.ReadValues(buf)
col.offset = 0
col.length = int32(n)
if n == 0 && err != nil {
return 0, err
}
}

if buf[col.offset].repetitionLevel != repetitionLevel {
return 0, nil
}

rows[0] = append(rows[0], buf[col.offset])
col.offset++
return 1, nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/unsafecast/unsafecast_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func AddressOf[T any](data []T) *T {

// AddressOfBytes returns the address of the first byte in data.
func AddressOfBytes(data []byte) *byte {
return AddressOf(data)
return *(**byte)(unsafe.Pointer(&data))
}

// AddressOfString returns the address of the first byte in data.
Expand Down
11 changes: 10 additions & 1 deletion page_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,9 @@ func TestOptionalPagePreserveIndex(t *testing.T) {
defer rows.Close()

rowbuf := make([]parquet.Row, 2)

n, err := rows.ReadRows(rowbuf)
if err != io.EOF {
if err != nil && err != io.EOF {
t.Fatal("reading rows:", err)
}
if n != 1 {
Expand All @@ -447,6 +448,14 @@ func TestOptionalPagePreserveIndex(t *testing.T) {
if rowbuf[0][0].Column() != 0 {
t.Errorf("wrong index: got=%d want=%d", rowbuf[0][0].Column(), 0)
}

n, err = rows.ReadRows(rowbuf)
if err != io.EOF {
t.Fatal("reading EOF:", err)
}
if n != 0 {
t.Fatal("expected no more rows after EOF:", n)
}
}

func TestRepeatedPageTrailingNulls(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions page_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func (r *byteArrayPageValues) readByteArrays(values []byte) (c, n int, err error
func (r *byteArrayPageValues) ReadValues(values []Value) (n int, err error) {
numValues := r.page.len()
for n < len(values) && r.offset < numValues {
value := r.page.index(r.offset)
values[n] = r.page.makeValueBytes(copyBytes(value))
values[n] = r.page.makeValueBytes(r.page.index(r.offset))
r.offset++
n++
}
Expand Down Expand Up @@ -372,7 +371,7 @@ func (r *fixedLenByteArrayPageValues) ReadFixedLenByteArrays(values []byte) (n i

func (r *fixedLenByteArrayPageValues) ReadValues(values []Value) (n int, err error) {
for n < len(values) && r.offset < len(r.page.data) {
values[n] = r.page.makeValueBytes(copyBytes(r.page.data[r.offset : r.offset+r.page.size]))
values[n] = r.page.makeValueBytes(r.page.data[r.offset : r.offset+r.page.size])
r.offset += r.page.size
n++
}
Expand Down
4 changes: 4 additions & 0 deletions row.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type RowReader interface {
// each row read from the reader. If the rows are not nil, the backing array
// of the slices will be used as an optimization to avoid re-allocating new
// arrays.
//
// The application is expected to handle the case where ReadRows returns
// less rows than requested and no error, by looking at the first returned
// value from ReadRows, which is the number of rows that were read.
ReadRows([]Row) (int, error)
}

Expand Down
Loading

0 comments on commit 2f8ef0d

Please sign in to comment.