Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add PreAlloc4Row and Insert for Chunk and List #7916

Merged
merged 13 commits into from
Oct 18, 2018
58 changes: 58 additions & 0 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package chunk

import (
"encoding/binary"
"reflect"
"unsafe"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -277,6 +278,63 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) {
}
}

// PreAlloc4Row pre-allocates the memory space for a Row.
// Nothing except for the nullBitMap of c.columns will be pre-written.
func (c *Chunk) PreAlloc4Row(row Row) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we preallocate the memory for a batch of rows? The memory grow stratagem may allocate a lot of unused memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Preallocate memory for a batch of rows cannot avoid the waste problem brought by memory usage increment of chunk.data.
    Unless the batch contains all of the rows which should be pre-allocated.

  2. I adjust the strategy according to https://github.com/golang/go/blob/master/src/runtime/slice.go#L116-L135

Copy link
Member

@zz-jason zz-jason Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

  1. s/PreAlloc4Row/PreAlloc/
  2. change the comment to:
// PreAlloc pre-allocates the memory space in a Chunk to store the Row.
// NOTE:
// 1. The Chunk must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the Chunk.
// 3. This API is paired with the `Insert()` function, which inserts all the
//    rows into the Chunk after the pre-allocation.

for i, srcCol := range row.c.columns {
dstCol := c.columns[i]
dstCol.appendNullBitmap(!srcCol.isNull(row.idx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to only pre-allocate the memory for null bitmap by calling dstCol.appendNullBitmap(true) and set the null bit mask in the insert function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We write the nullBitMap info here is because that,
Insert will be called parallelly later, every element in nullBitMap is a byte .
It will cause data race problem if we write the bit mask in Insert.

elemLen := len(srcCol.elemBuf)
if !srcCol.isFixed() {
elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx])
dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)+elemLen))
}
dstCol.length++
needCap := len(dstCol.data) + elemLen
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if needCap <= cap(dstCol.data) {
(*reflect.SliceHeader)(unsafe.Pointer(&dstCol.data)).Len = len(dstCol.data) + elemLen
continue
}
// Grow the capacity according to golang.growslice.
newCap := cap(dstCol.data)
doubleCap := newCap << 1
if needCap > doubleCap {
newCap = needCap
} else {
if len(dstCol.data) < 1024 {
newCap = doubleCap
} else {
for 0 < newCap && newCap < needCap {
newCap += newCap / 4
}
if newCap <= 0 {
newCap = needCap
}
}
}
dstCol.data = make([]byte, len(dstCol.data)+elemLen, newCap)
}
}

// Insert inserts `row` on the position specified by `rowIdx`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc4Row.
func (c *Chunk) Insert(rowIdx int, row Row) {
for i, srcCol := range row.c.columns {
dstCol := c.columns[i]
var srcStart, srcEnd, destStart, destEnd int
if srcCol.isFixed() {
srcElemLen, destElemLen := len(srcCol.elemBuf), len(dstCol.elemBuf)
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen
srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
} else {
srcStart, srcEnd = int(srcCol.offsets[row.idx]), int(srcCol.offsets[row.idx+1])
destStart, destEnd = int(dstCol.offsets[rowIdx]), int(dstCol.offsets[rowIdx+1])
}
copy(dstCol.data[destStart:destEnd], srcCol.data[srcStart:srcEnd])
}
}

// Append appends rows in [begin, end) in another Chunk to a Chunk.
func (c *Chunk) Append(other *Chunk, begin, end int) {
for colID, src := range other.columns {
Expand Down
103 changes: 103 additions & 0 deletions util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) {
checkRef()
}

func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

destChk := NewChunkWithCapacity(fieldTypes, 3)

// Test Chunk.PreAlloc4Row.
for i := 0; i < srcChk.NumRows(); i++ {
c.Assert(destChk.NumRows(), check.Equals, i)
destChk.PreAlloc4Row(srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]
c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf))
c.Assert(len(srcCol.data), check.Equals, len(destCol.data))
c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets))
c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap))
c.Assert(srcCol.length, check.Equals, destCol.length)
c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount)

for _, val := range destCol.data {
c.Assert(val == 0, check.IsTrue)
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test Chunk.Insert.
for i := srcChk.NumRows() - 1; i >= 0; i-- {
destChk.Insert(i, srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test parallel Chunk.Insert.
destChk.Reset()
startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{}
startWg.Add(1)
for i := 0; i < srcChk.NumRows(); i++ {
destChk.PreAlloc4Row(srcChk.GetRow(i))
endWg.Add(1)
go func(rowIdx int) {
defer func() {
endWg.Done()
}()
startWg.Wait()
destChk.Insert(rowIdx, srcChk.GetRow(rowIdx))
}(i)
}
startWg.Done()
endWg.Wait()
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}
}

func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)
Expand Down
29 changes: 29 additions & 0 deletions util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,35 @@ func (l *List) Reset() {
l.consumedIdx = -1
}

// PreAlloc4Row pre-allocates the storage memory for a Row.
// Note: this function will *ONLY* allocate the needed memory for `row`, the
// data will *NOT* be written into the List. List.Insert can be called to write
// the data later on.
func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment of this function should also be updated.

chkIdx := len(l.chunks) - 1
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() {
newChk := l.allocChunk()
l.chunks = append(l.chunks, newChk)
if chkIdx != l.consumedIdx {
l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage())
l.consumedIdx = chkIdx
}
chkIdx++
}
chk := l.chunks[chkIdx]
rowIdx := chk.NumRows()
chk.PreAlloc4Row(row)
l.length++
return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
}

// Insert inserts `row` on the position specified by `ptr`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc4Row.
func (l *List) Insert(ptr RowPtr, row Row) {
l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row)
}

// ListWalkFunc is used to walk the list.
// If error is returned, it will stop walking.
type ListWalkFunc = func(row Row) error
Expand Down
43 changes: 43 additions & 0 deletions util/chunk/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package chunk

import (
"math"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -114,6 +116,47 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) {
c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, memUsage+srcChk.MemoryUsage())
}

func (s *testChunkSuite) TestListPrePreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

srcList := NewList(fieldTypes, 3, 3)
destList := NewList(fieldTypes, 5, 5)
destRowPtr := make([]RowPtr, srcChk.NumRows())
for i := 0; i < srcChk.NumRows(); i++ {
srcList.AppendRow(srcChk.GetRow(i))
destRowPtr[i] = destList.PreAlloc4Row(srcChk.GetRow(i))
}

c.Assert(srcList.NumChunks(), check.Equals, 4)
c.Assert(destList.NumChunks(), check.Equals, 2)

iter4Src := NewIterator4List(srcList)
for row, i := iter4Src.Begin(), 0; row != iter4Src.End(); row, i = iter4Src.Next(), i+1 {
destList.Insert(destRowPtr[i], row)
}

iter4Dest := NewIterator4List(destList)
srcRow, destRow := iter4Src.Begin(), iter4Dest.Begin()
for ; srcRow != iter4Src.End(); srcRow, destRow = iter4Src.Next(), iter4Dest.Next() {
c.Assert(srcRow.GetFloat32(0), check.Equals, destRow.GetFloat32(0))
c.Assert(srcRow.GetInt64(1), check.Equals, destRow.GetInt64(1))
c.Assert(srcRow.GetMyDecimal(2).Compare(destRow.GetMyDecimal(2)) == 0, check.IsTrue)
c.Assert(srcRow.GetString(3), check.Equals, destRow.GetString(3))
}
}

func BenchmarkListMemoryUsage(b *testing.B) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
Expand Down