Skip to content

Commit

Permalink
reuse definition and repetition buffers (xitongsys#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille authored Aug 31, 2022
1 parent 2f8ef0d commit f812768
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 112 deletions.
96 changes: 35 additions & 61 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,60 +334,6 @@ func (b *buffer) clone() (clone *buffer) {
return clone
}

type bufferRef struct {
buf *buffer
off int
len int
}

func makeBufferRef(buf *buffer) (ref bufferRef) {
if buf != nil {
buf.ref()
ref.buf = buf
ref.len = len(buf.data)
}
return ref
}

func (r *bufferRef) data() []byte {
if r.buf != nil {
i := r.off
j := r.off + r.len
return r.buf.data[i:j:j]
}
return nil
}

func (r *bufferRef) ref() bufferRef {
if r.buf != nil {
r.buf.ref()
}
return *r
}

func (r *bufferRef) unref() {
if r.buf != nil {
r.buf.unref()
r.buf = nil
}
}

func (r *bufferRef) slice(i, j int) bufferRef {
if r.buf != nil {
r.buf.ref()
}
return bufferRef{buf: r.buf, off: r.off + i, len: j - i}
}

func (r *bufferRef) clone() (clone bufferRef) {
clone.off = r.off
clone.len = r.len
if r.buf != nil {
clone.buf = r.buf.clone()
}
return clone
}

type bufferPool struct {
pool sync.Pool
}
Expand Down Expand Up @@ -420,21 +366,49 @@ var (

type bufferedPage struct {
Page
values bufferRef
offsets bufferRef
values *buffer
offsets *buffer
repetitionLevels *buffer
definitionLevels *buffer
}

func (p *bufferedPage) Slice(i, j int64) Page {
bufferRef(p.values)
bufferRef(p.offsets)
bufferRef(p.definitionLevels)
bufferRef(p.repetitionLevels)

return &bufferedPage{
Page: p.Page.Slice(i, j),
values: p.values.ref(),
offsets: p.offsets.ref(),
values: p.values,
offsets: p.offsets,
definitionLevels: p.definitionLevels,
repetitionLevels: p.repetitionLevels,
Page: p.Page.Slice(i, j),
}
}

func unref(page Page) {
if p, _ := page.(*bufferedPage); p != nil {
p.values.unref()
p.offsets.unref()
bufferUnref(p.values)
bufferUnref(p.offsets)
bufferUnref(p.definitionLevels)
bufferUnref(p.repetitionLevels)

p.values = nil
p.offsets = nil
p.definitionLevels = nil
p.repetitionLevels = nil
}
}

func bufferRef(buf *buffer) {
if buf != nil {
buf.ref()
}
}

func bufferUnref(buf *buffer) {
if buf != nil {
buf.unref()
}
}
21 changes: 15 additions & 6 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,21 +646,30 @@ func (c *Column) decodeDataPage(header DataPageHeader, numValues int, repetition
newPage,
c.maxRepetitionLevel,
c.maxDefinitionLevel,
makeBufferRef(repetitionLevels),
makeBufferRef(definitionLevels),
repetitionLevels.data,
definitionLevels.data,
)
case c.maxDefinitionLevel > 0:
newPage = newOptionalPage(
newPage,
c.maxDefinitionLevel,
makeBufferRef(definitionLevels),
definitionLevels.data,
)
}

bufferRef(vbuf)
bufferRef(obuf)
bufferRef(repetitionLevels)
bufferRef(definitionLevels)

newPage = &bufferedPage{
Page: newPage,
values: makeBufferRef(vbuf),
offsets: makeBufferRef(obuf),
Page: newPage,
values: vbuf,
offsets: obuf,
repetitionLevels: repetitionLevels,
definitionLevels: definitionLevels,
}

return newPage, nil
}

Expand Down
9 changes: 3 additions & 6 deletions column_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ func (col *optionalColumnBuffer) Page() Page {
col.reordered = false
}

definitionLevels := makeBufferRef(&buffer{data: col.definitionLevels})
return newOptionalPage(col.base.Page(), col.maxDefinitionLevel, definitionLevels)
return newOptionalPage(col.base.Page(), col.maxDefinitionLevel, col.definitionLevels)
}

func (col *optionalColumnBuffer) Reset() {
Expand Down Expand Up @@ -551,14 +550,12 @@ func (col *repeatedColumnBuffer) Page() Page {
col.reordered = false
}

repetitionLevels := makeBufferRef(&buffer{data: col.repetitionLevels})
definitionLevels := makeBufferRef(&buffer{data: col.definitionLevels})
return newRepeatedPage(
col.base.Page(),
col.maxRepetitionLevel,
col.maxDefinitionLevel,
repetitionLevels,
definitionLevels,
col.repetitionLevels,
col.definitionLevels,
)
}

Expand Down
72 changes: 36 additions & 36 deletions page.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ func errPageBoundsOutOfRange(i, j, n int64) error {
type optionalPage struct {
base Page
maxDefinitionLevel byte
definitionLevels bufferRef
definitionLevels []byte
}

func newOptionalPage(base Page, maxDefinitionLevel byte, definitionLevels bufferRef) *optionalPage {
func newOptionalPage(base Page, maxDefinitionLevel byte, definitionLevels []byte) *optionalPage {
return &optionalPage{
base: base,
maxDefinitionLevel: maxDefinitionLevel,
Expand All @@ -393,21 +393,21 @@ func (page *optionalPage) Column() int { return page.base.Column() }

func (page *optionalPage) Dictionary() Dictionary { return page.base.Dictionary() }

func (page *optionalPage) NumRows() int64 { return int64(page.definitionLevels.len) }
func (page *optionalPage) NumRows() int64 { return int64(len(page.definitionLevels)) }

func (page *optionalPage) NumValues() int64 { return int64(page.definitionLevels.len) }
func (page *optionalPage) NumValues() int64 { return int64(len(page.definitionLevels)) }

func (page *optionalPage) NumNulls() int64 {
return int64(countLevelsNotEqual(page.definitionLevels.data(), page.maxDefinitionLevel))
return int64(countLevelsNotEqual(page.definitionLevels, page.maxDefinitionLevel))
}

func (page *optionalPage) Bounds() (min, max Value, ok bool) { return page.base.Bounds() }

func (page *optionalPage) Size() int64 { return page.base.Size() + int64(page.definitionLevels.len) }
func (page *optionalPage) Size() int64 { return page.base.Size() + int64(len(page.definitionLevels)) }

func (page *optionalPage) RepetitionLevels() []byte { return nil }

func (page *optionalPage) DefinitionLevels() []byte { return page.definitionLevels.data() }
func (page *optionalPage) DefinitionLevels() []byte { return page.definitionLevels }

func (page *optionalPage) Data() encoding.Values { return page.base.Data() }

Expand All @@ -422,30 +422,31 @@ func (page *optionalPage) Clone() Page {
return newOptionalPage(
page.base.Clone(),
page.maxDefinitionLevel,
page.definitionLevels.clone(),
copyBytes(page.definitionLevels),
)
}

func (page *optionalPage) Slice(i, j int64) Page {
definitionLevels := page.definitionLevels.data()
numNulls1 := int64(countLevelsNotEqual(definitionLevels[:i], page.maxDefinitionLevel))
numNulls2 := int64(countLevelsNotEqual(definitionLevels[i:j], page.maxDefinitionLevel))
maxDefinitionLevel := page.maxDefinitionLevel
definitionLevels := page.definitionLevels
numNulls1 := int64(countLevelsNotEqual(definitionLevels[:i], maxDefinitionLevel))
numNulls2 := int64(countLevelsNotEqual(definitionLevels[i:j], maxDefinitionLevel))
return newOptionalPage(
page.base.Slice(i-numNulls1, j-(numNulls1+numNulls2)),
page.maxDefinitionLevel,
page.definitionLevels.slice(int(i), int(j)),
maxDefinitionLevel,
definitionLevels[i:j:j],
)
}

type repeatedPage struct {
base Page
maxRepetitionLevel byte
maxDefinitionLevel byte
definitionLevels bufferRef
repetitionLevels bufferRef
definitionLevels []byte
repetitionLevels []byte
}

func newRepeatedPage(base Page, maxRepetitionLevel, maxDefinitionLevel byte, repetitionLevels, definitionLevels bufferRef) *repeatedPage {
func newRepeatedPage(base Page, maxRepetitionLevel, maxDefinitionLevel byte, repetitionLevels, definitionLevels []byte) *repeatedPage {
return &repeatedPage{
base: base,
maxRepetitionLevel: maxRepetitionLevel,
Expand All @@ -461,27 +462,23 @@ func (page *repeatedPage) Column() int { return page.base.Column() }

func (page *repeatedPage) Dictionary() Dictionary { return page.base.Dictionary() }

func (page *repeatedPage) NumRows() int64 {
return int64(countLevelsEqual(page.repetitionLevels.data(), 0))
}
func (page *repeatedPage) NumRows() int64 { return int64(countLevelsEqual(page.repetitionLevels, 0)) }

func (page *repeatedPage) NumValues() int64 {
return int64(page.definitionLevels.len)
}
func (page *repeatedPage) NumValues() int64 { return int64(len(page.definitionLevels)) }

func (page *repeatedPage) NumNulls() int64 {
return int64(countLevelsNotEqual(page.definitionLevels.data(), page.maxDefinitionLevel))
return int64(countLevelsNotEqual(page.definitionLevels, page.maxDefinitionLevel))
}

func (page *repeatedPage) Bounds() (min, max Value, ok bool) { return page.base.Bounds() }

func (page *repeatedPage) Size() int64 {
return int64(page.repetitionLevels.len) + int64(page.definitionLevels.len) + page.base.Size()
return int64(len(page.repetitionLevels)) + int64(len(page.definitionLevels)) + page.base.Size()
}

func (page *repeatedPage) RepetitionLevels() []byte { return page.repetitionLevels.data() }
func (page *repeatedPage) RepetitionLevels() []byte { return page.repetitionLevels }

func (page *repeatedPage) DefinitionLevels() []byte { return page.definitionLevels.data() }
func (page *repeatedPage) DefinitionLevels() []byte { return page.definitionLevels }

func (page *repeatedPage) Data() encoding.Values { return page.base.Data() }

Expand All @@ -497,8 +494,8 @@ func (page *repeatedPage) Clone() Page {
page.base.Clone(),
page.maxRepetitionLevel,
page.maxDefinitionLevel,
page.repetitionLevels.clone(),
page.definitionLevels.clone(),
copyBytes(page.repetitionLevels),
copyBytes(page.definitionLevels),
)
}

Expand All @@ -514,7 +511,11 @@ func (page *repeatedPage) Slice(i, j int64) Page {
panic(errPageBoundsOutOfRange(i, j, numRows))
}

repetitionLevels := page.repetitionLevels.data()
maxRepetitionLevel := page.maxRepetitionLevel
maxDefinitionLevel := page.maxDefinitionLevel
repetitionLevels := page.repetitionLevels
definitionLevels := page.definitionLevels

rowIndex0 := 0
rowIndex1 := len(repetitionLevels)
rowIndex2 := len(repetitionLevels)
Expand All @@ -539,19 +540,18 @@ func (page *repeatedPage) Slice(i, j int64) Page {
}
}

definitionLevels := page.definitionLevels.data()
numNulls1 := countLevelsNotEqual(definitionLevels[:rowIndex1], page.maxDefinitionLevel)
numNulls2 := countLevelsNotEqual(definitionLevels[rowIndex1:rowIndex2], page.maxDefinitionLevel)
numNulls1 := countLevelsNotEqual(definitionLevels[:rowIndex1], maxDefinitionLevel)
numNulls2 := countLevelsNotEqual(definitionLevels[rowIndex1:rowIndex2], maxDefinitionLevel)

i = int64(rowIndex1 - numNulls1)
j = int64(rowIndex2 - (numNulls1 + numNulls2))

return newRepeatedPage(
page.base.Slice(i, j),
page.maxRepetitionLevel,
page.maxDefinitionLevel,
page.repetitionLevels.slice(int(rowIndex1), int(rowIndex2)),
page.definitionLevels.slice(int(rowIndex1), int(rowIndex2)),
maxRepetitionLevel,
maxDefinitionLevel,
repetitionLevels[rowIndex1:rowIndex2:rowIndex2],
definitionLevels[rowIndex1:rowIndex2:rowIndex2],
)
}

Expand Down
6 changes: 3 additions & 3 deletions page_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type optionalPageValues struct {

func (r *optionalPageValues) ReadValues(values []Value) (n int, err error) {
maxDefinitionLevel := r.page.maxDefinitionLevel
definitionLevels := r.page.definitionLevels.data()
definitionLevels := r.page.definitionLevels
columnIndex := ^int16(r.page.Column())

for n < len(values) && r.offset < len(definitionLevels) {
Expand Down Expand Up @@ -64,8 +64,8 @@ type repeatedPageValues struct {

func (r *repeatedPageValues) ReadValues(values []Value) (n int, err error) {
maxDefinitionLevel := r.page.maxDefinitionLevel
definitionLevels := r.page.definitionLevels.data()
repetitionLevels := r.page.repetitionLevels.data()
definitionLevels := r.page.definitionLevels
repetitionLevels := r.page.repetitionLevels
columnIndex := ^int16(r.page.Column())

for n < len(values) && r.offset < len(definitionLevels) {
Expand Down

0 comments on commit f812768

Please sign in to comment.