Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, table: improve the efficiency of adding index #48184

Merged
merged 24 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ go_library(
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (e *Engine) getAdjustedConcurrency() int {
if e.checkHotspot {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
adjusted := int(MergeSortOverlapThreshold) / len(e.dataFiles)
adjusted := maxCloudStorageConnections / len(e.dataFiles)
return min(adjusted, 8)
}
adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) {
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
e.dataFiles = genFiles(1000)
e.dataFiles = genFiles(8000)
require.Equal(t, 1, e.getAdjustedConcurrency())

e.checkHotspot = false
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type mergeIter[T heapElem, R sortedReader[T]] struct {
curr T
lastReaderIdx int
err error
// determines whether to check reader hotspot, if hotspot is detected, we will

// determines whether to check reader hotspot, if hotspot is detected, we will
tangenta marked this conversation as resolved.
Show resolved Hide resolved
// try read this file concurrently.
checkHotspot bool
hotspotMap map[int]int
Expand Down
51 changes: 3 additions & 48 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ import (
"path/filepath"
"slices"
"strconv"
"sync"
"time"

"github.com/docker/go-units"
"github.com/jfcg/sorty/v2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand Down Expand Up @@ -106,10 +102,6 @@ type WriterBuilder struct {
propKeysDist uint64
onClose OnCloseFunc
keyDupeEncoding bool
// This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server.
mu *sync.Mutex

bufferPool *membuf.Pool
}

// NewWriterBuilder creates a WriterBuilder.
Expand Down Expand Up @@ -159,24 +151,12 @@ func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder {
return b
}

// SetBufferPool sets the buffer pool of the writer.
func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder {
b.bufferPool = bufferPool
return b
}

// SetKeyDuplicationEncoding sets if the writer can distinguish duplicate key.
func (b *WriterBuilder) SetKeyDuplicationEncoding(val bool) *WriterBuilder {
b.keyDupeEncoding = val
return b
}

// SetMutex sets the mutex of the writer.
func (b *WriterBuilder) SetMutex(mu *sync.Mutex) *WriterBuilder {
b.mu = mu
return b
}

// SetBlockSize sets the block size of pre-allocated buf in the writer.
func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder {
b.blockSize = blockSize
Expand All @@ -190,10 +170,6 @@ func (b *WriterBuilder) Build(
prefix string,
writerID string,
) *Writer {
bp := b.bufferPool
if bp == nil {
bp = membuf.NewPool()
}
filenamePrefix := filepath.Join(prefix, writerID)
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if b.keyDupeEncoding {
Expand All @@ -219,7 +195,6 @@ func (b *WriterBuilder) Build(
multiFileStats: make([]MultipleFilesStat, 1),
fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum),
fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum),
shareMu: b.mu,
}
ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum)
return ret
Expand Down Expand Up @@ -312,8 +287,6 @@ type Writer struct {
minKey tidbkv.Key
maxKey tidbkv.Key
totalSize uint64
// This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server.
shareMu *sync.Mutex
}

// WriteRow implements ingest.Writer.
Expand Down Expand Up @@ -406,10 +379,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
if len(w.kvLocations) == 0 {
return nil
}
if w.shareMu != nil {
w.shareMu.Lock()
defer w.shareMu.Unlock()
}

logger := logutil.Logger(ctx)
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
Expand Down Expand Up @@ -470,23 +439,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}()

sortStart := time.Now()
if w.shareMu != nil {
sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need sorty library? It has better performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean setting sorty.MaxGor to 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorty.Sort(len(w.kvLocations), func(i, j, r, s int) bool {
posi, posj := w.kvLocations[i], w.kvLocations[j]
if bytes.Compare(w.getKeyByLoc(posi), w.getKeyByLoc(posj)) < 0 {
if r != s {
w.kvLocations[r], w.kvLocations[s] = w.kvLocations[s], w.kvLocations[r]
}
return true
}
return false
})
} else {
slices.SortFunc(w.kvLocations, func(i, j kvLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
}
slices.SortFunc(w.kvLocations, func(i, j kvLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
sortDuration = time.Since(sortStart)

writeStartTime = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
}
iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil)
for iter.Valid() {
indexKey, _, _, err := iter.Next(indexBuffer)
indexKey, _, _, err := iter.Next(indexBuffer, nil)
if err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,11 +1578,6 @@ func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
}

// GetMutex returns the mutex of the backend.
func (local *Backend) GetMutex() *sync.Mutex {
return &local.mu
}

// expose these variables to unit test.
var (
testJobToWorkerCh = make(chan *regionJob)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ go_test(
"attributes_sql_test.go",
"backfilling_dispatcher_test.go",
"backfilling_test.go",
"bench_test.go",
"cancel_test.go",
"cluster_test.go",
"column_change_test.go",
Expand Down
17 changes: 3 additions & 14 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"path"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -42,7 +41,6 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -172,7 +170,6 @@ func NewWriteIndexToExternalStoragePipeline(
totalRowCount *atomic.Int64,
metricCounter prometheus.Counter,
onClose external.OnCloseFunc,
bcctx ingest.BackendCtx,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -189,8 +186,7 @@ func NewWriteIndexToExternalStoragePipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt := int(variable.GetDDLReorgWorkerCounter())
writerCnt := 1
readerCnt, writerCnt := expectedIngestWorkerCnt()

backend, err := storage.ParseBackend(extStoreURI, nil)
if err != nil {
Expand All @@ -201,21 +197,16 @@ func NewWriteIndexToExternalStoragePipeline(
return nil, err
}

var shareMu *sync.Mutex
if bcctx != nil {
shareMu = bcctx.GetLocalBackend().GetMutex()
}

memTotal, err := memory.MemTotal()
if err != nil {
return nil, err
}
memSize := (memTotal / 2) / uint64(len(indexes))
memSize := (memTotal / 2) / uint64(writerCnt) / uint64(len(indexes))

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, shareMu, memSize)
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSize)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, totalRowCount, metricCounter)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -499,7 +490,6 @@ func NewWriteExternalStoreOperator(
srcChunkPool chan *chunk.Chunk,
concurrency int,
onClose external.OnCloseFunc,
shareMu *sync.Mutex,
memoryQuota uint64,
) *WriteExternalStoreOperator {
pool := workerpool.NewWorkerPool(
Expand All @@ -512,7 +502,6 @@ func NewWriteExternalStoreOperator(
builder := external.NewWriterBuilder().
SetOnCloseFunc(onClose).
SetKeyDuplicationEncoding(index.Meta().Unique).
SetMutex(shareMu).
SetMemorySizeLimit(memoryQuota)
writerID := uuid.New().String()
prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,5 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
return NewWriteIndexToExternalStoragePipeline(
opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID,
tbl, r.indexes, start, end, totalRowCount, counter, onClose, r.bc)
tbl, r.indexes, start, end, totalRowCount, counter, onClose)
}
100 changes: 100 additions & 0 deletions pkg/ddl/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"testing"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)

func BenchmarkExtractDatumByOffsets(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")
copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "")
require.NoError(b, err)
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(b, err)
copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
require.NoError(b, err)
require.NoError(b, txn.Rollback())

handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets))

iter := chunk.NewIterator4Chunk(copChunk)
row := iter.Begin()
c := copCtx.GetBase()
offsets := copCtx.IndexColumnOutputOffsets(idxInfo.ID)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ddl.ExtractDatumByOffsetsForTest(row, offsets, c.ExprColumnInfos, handleDataBuf)
}
}

func BenchmarkGenerateIndexKV(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")

index := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sctx := tk.Session().GetSessionVars().StmtCtx
idxDt := []types.Datum{types.NewIntDatum(10)}
buf := make([]byte, 0, 64)
handle := kv.IntHandle(1)

b.ResetTimer()
for i := 0; i < b.N; i++ {
buf = buf[:0]
iter := index.GenIndexKVIter(sctx, idxDt, handle, nil)
_, _, _, err = iter.Next(buf, nil)
if err != nil {
break
}
}
require.NoError(b, err)
}
8 changes: 6 additions & 2 deletions pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey,
}

func ConvertRowToHandleAndIndexDatum(
handleDataBuf, idxDataBuf []types.Datum,
row chunk.Row, copCtx copr.CopContext, idxID int64) (kv.Handle, []types.Datum, error) {
c := copCtx.GetBase()
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, nil)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, nil)
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local))
return handle, idxData, err
}

// ExtractDatumByOffsetsForTest is used for test.
var ExtractDatumByOffsetsForTest = extractDatumByOffsets
Loading
Loading