Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add PreAlloc4Row and Insert for Chunk and List #7916

Merged
merged 13 commits into from
Oct 18, 2018
69 changes: 69 additions & 0 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package chunk

import (
"encoding/binary"
"reflect"
"unsafe"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -277,6 +278,74 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) {
}
}

// PreAlloc pre-allocates the memory space in a Chunk to store the Row.
// NOTE:
// 1. The Chunk must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the Chunk.
// 3. This API is paired with the `Insert()` function, which inserts all the
// rows data into the Chunk after the pre-allocation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding another NOTE in the comment:

// 4. We set the null bitmap here instead of in the Insert() function because
//    the Insert() function is called parallelly, the data race on a byte can
//    not be avoided although the manipulated bits are different inside a byte.

// 4. We set the null bitmap here instead of in the Insert() function because
// when the Insert() function is called parallelly, the data race on a byte
// can not be avoided although the manipulated bits are different inside a
// byte.
func (c *Chunk) PreAlloc(row Row) {
for i, srcCol := range row.c.columns {
dstCol := c.columns[i]
dstCol.appendNullBitmap(!srcCol.isNull(row.idx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to only pre-allocate the memory for null bitmap by calling dstCol.appendNullBitmap(true) and set the null bit mask in the insert function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We write the nullBitMap info here is because that,
Insert will be called parallelly later, every element in nullBitMap is a byte .
It will cause data race problem if we write the bit mask in Insert.

elemLen := len(srcCol.elemBuf)
if !srcCol.isFixed() {
elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx])
dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)+elemLen))
}
dstCol.length++
needCap := len(dstCol.data) + elemLen
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if needCap <= cap(dstCol.data) {
(*reflect.SliceHeader)(unsafe.Pointer(&dstCol.data)).Len = len(dstCol.data) + elemLen
continue
}
// Grow the capacity according to golang.growslice.
newCap := cap(dstCol.data)
doubleCap := newCap << 1
if needCap > doubleCap {
newCap = needCap
} else {
if len(dstCol.data) < 1024 {
newCap = doubleCap
} else {
for 0 < newCap && newCap < needCap {
newCap += newCap / 4
}
if newCap <= 0 {
newCap = needCap
}
}
}
dstCol.data = make([]byte, len(dstCol.data)+elemLen, newCap)
}
}

// Insert inserts `row` on the position specified by `rowIdx`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc.
func (c *Chunk) Insert(rowIdx int, row Row) {
for i, srcCol := range row.c.columns {
if row.IsNull(i) {
continue
}
dstCol := c.columns[i]
var srcStart, srcEnd, destStart, destEnd int
if srcCol.isFixed() {
srcElemLen, destElemLen := len(srcCol.elemBuf), len(dstCol.elemBuf)
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen
srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
} else {
srcStart, srcEnd = int(srcCol.offsets[row.idx]), int(srcCol.offsets[row.idx+1])
destStart, destEnd = int(dstCol.offsets[rowIdx]), int(dstCol.offsets[rowIdx+1])
}
copy(dstCol.data[destStart:destEnd], srcCol.data[srcStart:srcEnd])
}
}

// Append appends rows in [begin, end) in another Chunk to a Chunk.
func (c *Chunk) Append(other *Chunk, begin, end int) {
for colID, src := range other.columns {
Expand Down
103 changes: 103 additions & 0 deletions util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) {
checkRef()
}

func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

destChk := NewChunkWithCapacity(fieldTypes, 3)

// Test Chunk.PreAlloc.
for i := 0; i < srcChk.NumRows(); i++ {
c.Assert(destChk.NumRows(), check.Equals, i)
destChk.PreAlloc(srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]
c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf))
c.Assert(len(srcCol.data), check.Equals, len(destCol.data))
c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets))
c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap))
c.Assert(srcCol.length, check.Equals, destCol.length)
c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount)

for _, val := range destCol.data {
c.Assert(val == 0, check.IsTrue)
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test Chunk.Insert.
for i := srcChk.NumRows() - 1; i >= 0; i-- {
destChk.Insert(i, srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test parallel Chunk.Insert.
destChk.Reset()
startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{}
startWg.Add(1)
for i := 0; i < srcChk.NumRows(); i++ {
destChk.PreAlloc(srcChk.GetRow(i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm..I have a question too, there are any potential PreAlloc many rows in one call?

or direct PreAlloc last row in chunk....in our test case, it's PreAlloc row by row and memory grow as row handle, maybe direct alloc whole memory is better? 😆 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question not review... I read too slow 🤣

Copy link
Contributor Author

@XuHuaiyu XuHuaiyu Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lysu Direct PreAlloc the whole memory when meeting the last row seems to be reasonable.
May be we can add an input argument like doAlloc for PreAlloc?
Not that easy. We need a better way.

endWg.Add(1)
go func(rowIdx int) {
defer func() {
endWg.Done()
}()
startWg.Wait()
destChk.Insert(rowIdx, srcChk.GetRow(rowIdx))
}(i)
}
startWg.Done()
endWg.Wait()
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}
}

func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)
Expand Down
31 changes: 31 additions & 0 deletions util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ func (l *List) Reset() {
l.consumedIdx = -1
}

// PreAlloc4Row pre-allocates the storage memory for a Row.
// NOTE:
// 1. The List must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the List.
// 3. This API is paired with the `Insert()` function, which inserts all the
// rows data into the List after the pre-allocation.
func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment of this function should also be updated.

chkIdx := len(l.chunks) - 1
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() {
newChk := l.allocChunk()
l.chunks = append(l.chunks, newChk)
if chkIdx != l.consumedIdx {
l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage())
l.consumedIdx = chkIdx
}
chkIdx++
}
chk := l.chunks[chkIdx]
rowIdx := chk.NumRows()
chk.PreAlloc(row)
l.length++
return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
}

// Insert inserts `row` on the position specified by `ptr`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc.
func (l *List) Insert(ptr RowPtr, row Row) {
l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row)
}

// ListWalkFunc is used to walk the list.
// If error is returned, it will stop walking.
type ListWalkFunc = func(row Row) error
Expand Down
43 changes: 43 additions & 0 deletions util/chunk/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package chunk

import (
"math"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -114,6 +116,47 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) {
c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, memUsage+srcChk.MemoryUsage())
}

func (s *testChunkSuite) TestListPrePreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

srcList := NewList(fieldTypes, 3, 3)
destList := NewList(fieldTypes, 5, 5)
destRowPtr := make([]RowPtr, srcChk.NumRows())
for i := 0; i < srcChk.NumRows(); i++ {
srcList.AppendRow(srcChk.GetRow(i))
destRowPtr[i] = destList.PreAlloc4Row(srcChk.GetRow(i))
}

c.Assert(srcList.NumChunks(), check.Equals, 4)
c.Assert(destList.NumChunks(), check.Equals, 2)

iter4Src := NewIterator4List(srcList)
for row, i := iter4Src.Begin(), 0; row != iter4Src.End(); row, i = iter4Src.Next(), i+1 {
destList.Insert(destRowPtr[i], row)
}

iter4Dest := NewIterator4List(destList)
srcRow, destRow := iter4Src.Begin(), iter4Dest.Begin()
for ; srcRow != iter4Src.End(); srcRow, destRow = iter4Src.Next(), iter4Dest.Next() {
c.Assert(srcRow.GetFloat32(0), check.Equals, destRow.GetFloat32(0))
c.Assert(srcRow.GetInt64(1), check.Equals, destRow.GetInt64(1))
c.Assert(srcRow.GetMyDecimal(2).Compare(destRow.GetMyDecimal(2)) == 0, check.IsTrue)
c.Assert(srcRow.GetString(3), check.Equals, destRow.GetString(3))
}
}

func BenchmarkListMemoryUsage(b *testing.B) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
Expand Down