Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, table: improve the efficiency of adding index #48184

Merged
merged 24 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) {
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
e.dataFiles = genFiles(1000)
e.dataFiles = genFiles(8000)
require.Equal(t, 1, e.getAdjustedConcurrency())

e.checkHotspot = false
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type mergeIter[T heapElem, R sortedReader[T]] struct {
curr T
lastReaderIdx int
err error
// determines whether to check reader hotspot, if hotspot is detected, we will

// determines whether to check reader hotspot, if hotspot is detected, we will
tangenta marked this conversation as resolved.
Show resolved Hide resolved
// try read this file concurrently.
checkHotspot bool
hotspotMap map[int]int
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (

// MergeSortOverlapThreshold is the threshold of overlap between sorted kv files.
// if the overlap ratio is greater than this threshold, we will merge the files.
MergeSortOverlapThreshold int64 = 1000
MergeSortOverlapThreshold int64 = 8000
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// MergeSortFileCountStep is the step of file count when we split the sorted kv files.
MergeSortFileCountStep = 1000
)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
}
iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil)
for iter.Valid() {
indexKey, _, _, err := iter.Next(indexBuffer)
indexKey, _, _, err := iter.Next(indexBuffer, nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ go_test(
"attributes_sql_test.go",
"backfilling_dispatcher_test.go",
"backfilling_test.go",
"bench_test.go",
"cancel_test.go",
"cluster_test.go",
"column_change_test.go",
Expand Down
100 changes: 100 additions & 0 deletions pkg/ddl/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"testing"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)

func BenchmarkExtractDatumByOffsets(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")
copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "")
require.NoError(b, err)
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(b, err)
copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
require.NoError(b, err)
require.NoError(b, txn.Rollback())

handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets))

iter := chunk.NewIterator4Chunk(copChunk)
row := iter.Begin()
c := copCtx.GetBase()
offsets := copCtx.IndexColumnOutputOffsets(idxInfo.ID)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ddl.ExtractDatumByOffsetsForTest(row, offsets, c.ExprColumnInfos, handleDataBuf)
}
}

func BenchmarkGenerateIndexKV(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")

index := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sctx := tk.Session().GetSessionVars().StmtCtx
idxDt := []types.Datum{types.NewIntDatum(10)}
buf := make([]byte, 0, 64)
handle := kv.IntHandle(1)

b.ResetTimer()
for i := 0; i < b.N; i++ {
buf = buf[:0]
iter := index.GenIndexKVIter(sctx, idxDt, handle, nil)
_, _, _, err = iter.Next(buf, nil)
if err != nil {
break
}
}
require.NoError(b, err)
}
8 changes: 6 additions & 2 deletions pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey,
}

func ConvertRowToHandleAndIndexDatum(
handleDataBuf, idxDataBuf []types.Datum,
row chunk.Row, copCtx copr.CopContext, idxID int64) (kv.Handle, []types.Datum, error) {
c := copCtx.GetBase()
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, nil)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, nil)
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local))
return handle, idxData, err
}

// ExtractDatumByOffsetsForTest is used for test.
var ExtractDatumByOffsetsForTest = extractDatumByOffsets
14 changes: 7 additions & 7 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [
if cnt < len(w.idxKeyBufs) {
buf = w.idxKeyBufs[cnt]
}
key, val, distinct, err := iter.Next(buf)
key, val, distinct, err := iter.Next(buf, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1804,25 +1804,24 @@ func writeChunkToLocal(
}
}()
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handleDataBuf = handleDataBuf[:0]
handleDataBuf := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx)
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx)
if err != nil {
return 0, nil, errors.Trace(err)
}
for i, index := range indexes {
idxID := index.Meta().ID
idxDataBuf = idxDataBuf[:0]
idxDataBuf = extractDatumByOffsets(
row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
idxData := idxDataBuf[:len(index.Meta().Columns)]
rsData := getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, handleDataBuf)
err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxDataBuf, rsData, handle)
err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h)
if err != nil {
return 0, nil, errors.Trace(err)
}
}
count++
lastHandle = handle
lastHandle = h
}
return count, lastHandle, nil
}
Expand All @@ -1849,7 +1848,7 @@ func writeOneKVToLocal(
) error {
iter := index.GenIndexKVIter(sCtx, idxDt, handle, rsData)
for iter.Valid() {
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf)
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1867,6 +1866,7 @@ func writeOneKVToLocal(
failpoint.Return(errors.New("mock engine error"))
})
writeBufs.IndexKeyBuf = key
writeBufs.RowValBuf = idxVal
}
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,9 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col
}

func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column, buf []types.Datum) []types.Datum {
for _, offset := range offsets {
for i, offset := range offsets {
c := expCols[offset]
rowDt := row.GetDatum(offset, c.GetType())
buf = append(buf, rowDt)
row.DatumWithBuffer(offset, c.GetType(), &buf[i])
}
return buf
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
iter := chunk.NewIterator4Chunk(copChunk)
handles := make([]kv.Handle, 0, copChunk.NumRows())
values := make([][]types.Datum, 0, copChunk.NumRows())
handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets))
idxDataBuf := make([]types.Datum, len(idxInfo.Columns))

for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle, idxDatum, err := ddl.ConvertRowToHandleAndIndexDatum(row, copCtx, idxInfo.ID)
handle, idxDatum, err := ddl.ConvertRowToHandleAndIndexDatum(handleDataBuf, idxDataBuf, row, copCtx, idxInfo.ID)
require.NoError(t, err)
handles = append(handles, handle)
values = append(values, idxDatum)
copiedIdxDatum := make([]types.Datum, len(idxDatum))
copy(copiedIdxDatum, idxDatum)
values = append(values, copiedIdxDatum)
}
return handles, values
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
if cnt < len(e.idxKeyBufs) {
buf = e.idxKeyBufs[cnt]
}
key, _, distinct, err := iter.Next(buf)
key, _, distinct, err := iter.Next(buf, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
// due to we only care about distinct key.
iter := v.GenIndexKVIter(ctx.GetSessionVars().StmtCtx, colVals, kv.IntHandle(0), nil)
for iter.Valid() {
key, _, distinct, err1 := iter.Next(nil)
key, _, distinct, err1 := iter.Next(nil, nil)
if err1 != nil {
return nil, err1
}
Expand Down
74 changes: 66 additions & 8 deletions pkg/table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc {
}
}

// IndexIter is index kvs iter.
type IndexIter interface {
Next(kb []byte) ([]byte, []byte, bool, error)
Valid() bool
}

// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Expand All @@ -86,15 +80,79 @@ type Index interface {
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation.
GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter
GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexKVGenerator
// Exist supports check index exists or not.
Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
// GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead.
GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error)
// GenIndexValue generates an index value.
GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error)
GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum, buf []byte) ([]byte, error)
// FetchValues fetched index column values in a row.
// Param columns is a reused buffer, if it is not nil, FetchValues will fill the index values in it,
// and return the buffer, if it is nil, FetchValues will allocate the buffer instead.
FetchValues(row []types.Datum, columns []types.Datum) ([]types.Datum, error)
}

// IndexKVGenerator generates kv for an index.
// It could be also used for generating multi-value indexes.
type IndexKVGenerator struct {
index Index
sCtx *stmtctx.StatementContext
handle kv.Handle
handleRestoreData []types.Datum

// Only used by multi-value index.
allIdxVals [][]types.Datum
i int
// Only used by non multi-value index.
idxVals []types.Datum
}

// NewIndexKVGenerator creates a new IndexKVGenerator.
func NewIndexKVGenerator(
index Index,
stmtCtx *stmtctx.StatementContext,
handle kv.Handle,
handleRestoredData []types.Datum,
mvIndexData [][]types.Datum,
singleIdxData []types.Datum,
) IndexKVGenerator {
return IndexKVGenerator{
index: index,
sCtx: stmtCtx,
handle: handle,
handleRestoreData: handleRestoredData,
allIdxVals: mvIndexData,
i: 0,
idxVals: singleIdxData,
}
}

// Next returns the next index key and value.
// For non multi-value indexes, there is only one index kv.
func (iter *IndexKVGenerator) Next(keyBuf, valBuf []byte) ([]byte, []byte, bool, error) {
var val []types.Datum
if len(iter.allIdxVals) == 0 {
val = iter.idxVals
} else {
val = iter.allIdxVals[iter.i]
}
key, distinct, err := iter.index.GenIndexKey(iter.sCtx, val, iter.handle, keyBuf)
if err != nil {
return nil, nil, false, err
}
idxVal, err := iter.index.GenIndexValue(iter.sCtx, distinct, val, iter.handle, iter.handleRestoreData, valBuf)
if err != nil {
return nil, nil, false, err
}
iter.i++
return key, idxVal, distinct, err
}

// Valid returns true if the generator is exhausted.
tangenta marked this conversation as resolved.
Show resolved Hide resolved
func (iter *IndexKVGenerator) Valid() bool {
if len(iter.allIdxVals) == 0 {
return iter.i == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only Valid() before the first Next() call?

Copy link
Contributor Author

@tangenta tangenta Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For non multi-value indexes, there is only one index kv per row.

}
return iter.i < len(iter.allIdxVals)
}
Loading