Skip to content

Commit

Permalink
ddl, table: improve the efficiency of adding index (#48184) (#48250)
Browse files Browse the repository at this point in the history
ref #47757
  • Loading branch information
ti-chi-bot authored Nov 2, 2023
1 parent 3f495cc commit d3d5972
Show file tree
Hide file tree
Showing 30 changed files with 292 additions and 158 deletions.
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ go_library(
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/mathutil",
Expand All @@ -39,7 +38,6 @@ go_library(
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (e *Engine) getAdjustedConcurrency() int {
if e.checkHotspot {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
adjusted := int(MergeSortOverlapThreshold) / len(e.dataFiles)
adjusted := maxCloudStorageConnections / len(e.dataFiles)
return min(adjusted, 8)
}
adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) {
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
e.dataFiles = genFiles(1000)
e.dataFiles = genFiles(8000)
require.Equal(t, 1, e.getAdjustedConcurrency())

e.checkHotspot = false
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type mergeIter[T heapElem, R sortedReader[T]] struct {
curr T
lastReaderIdx int
err error
// determines whether to check reader hotspot, if hotspot is detected, we will

// determines whether to check reader hotspot, if hotspot is detected, we will
// try read this file concurrently.
checkHotspot bool
hotspotMap map[int]int
Expand Down
51 changes: 3 additions & 48 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ import (
"path/filepath"
"slices"
"strconv"
"sync"
"time"

"github.com/docker/go-units"
"github.com/jfcg/sorty/v2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand Down Expand Up @@ -106,10 +102,6 @@ type WriterBuilder struct {
propKeysDist uint64
onClose OnCloseFunc
keyDupeEncoding bool
// This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server.
mu *sync.Mutex

bufferPool *membuf.Pool
}

// NewWriterBuilder creates a WriterBuilder.
Expand Down Expand Up @@ -159,24 +151,12 @@ func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder {
return b
}

// SetBufferPool sets the buffer pool of the writer.
func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder {
b.bufferPool = bufferPool
return b
}

// SetKeyDuplicationEncoding sets if the writer can distinguish duplicate key.
func (b *WriterBuilder) SetKeyDuplicationEncoding(val bool) *WriterBuilder {
b.keyDupeEncoding = val
return b
}

// SetMutex sets the mutex of the writer.
func (b *WriterBuilder) SetMutex(mu *sync.Mutex) *WriterBuilder {
b.mu = mu
return b
}

// SetBlockSize sets the block size of pre-allocated buf in the writer.
func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder {
b.blockSize = blockSize
Expand All @@ -190,10 +170,6 @@ func (b *WriterBuilder) Build(
prefix string,
writerID string,
) *Writer {
bp := b.bufferPool
if bp == nil {
bp = membuf.NewPool()
}
filenamePrefix := filepath.Join(prefix, writerID)
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if b.keyDupeEncoding {
Expand All @@ -219,7 +195,6 @@ func (b *WriterBuilder) Build(
multiFileStats: make([]MultipleFilesStat, 1),
fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum),
fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum),
shareMu: b.mu,
}
ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum)
return ret
Expand Down Expand Up @@ -312,8 +287,6 @@ type Writer struct {
minKey tidbkv.Key
maxKey tidbkv.Key
totalSize uint64
// This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server.
shareMu *sync.Mutex
}

// WriteRow implements ingest.Writer.
Expand Down Expand Up @@ -406,10 +379,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
if len(w.kvLocations) == 0 {
return nil
}
if w.shareMu != nil {
w.shareMu.Lock()
defer w.shareMu.Unlock()
}

logger := logutil.Logger(ctx)
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
Expand Down Expand Up @@ -470,23 +439,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}()

sortStart := time.Now()
if w.shareMu != nil {
sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
sorty.Sort(len(w.kvLocations), func(i, j, r, s int) bool {
posi, posj := w.kvLocations[i], w.kvLocations[j]
if bytes.Compare(w.getKeyByLoc(posi), w.getKeyByLoc(posj)) < 0 {
if r != s {
w.kvLocations[r], w.kvLocations[s] = w.kvLocations[s], w.kvLocations[r]
}
return true
}
return false
})
} else {
slices.SortFunc(w.kvLocations, func(i, j kvLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
}
slices.SortFunc(w.kvLocations, func(i, j kvLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
sortDuration = time.Since(sortStart)

writeStartTime = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
}
iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil)
for iter.Valid() {
indexKey, _, _, err := iter.Next(indexBuffer)
indexKey, _, _, err := iter.Next(indexBuffer, nil)
if err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,11 +1579,6 @@ func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
}

// GetMutex returns the mutex of the backend.
func (local *Backend) GetMutex() *sync.Mutex {
return &local.mu
}

// expose these variables to unit test.
var (
testJobToWorkerCh = make(chan *regionJob)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ go_test(
"attributes_sql_test.go",
"backfilling_dispatcher_test.go",
"backfilling_test.go",
"bench_test.go",
"cancel_test.go",
"cluster_test.go",
"column_change_test.go",
Expand Down
17 changes: 3 additions & 14 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"path"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -42,7 +41,6 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -172,7 +170,6 @@ func NewWriteIndexToExternalStoragePipeline(
totalRowCount *atomic.Int64,
metricCounter prometheus.Counter,
onClose external.OnCloseFunc,
bcctx ingest.BackendCtx,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -189,8 +186,7 @@ func NewWriteIndexToExternalStoragePipeline(
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
readerCnt := int(variable.GetDDLReorgWorkerCounter())
writerCnt := 1
readerCnt, writerCnt := expectedIngestWorkerCnt()

backend, err := storage.ParseBackend(extStoreURI, nil)
if err != nil {
Expand All @@ -201,21 +197,16 @@ func NewWriteIndexToExternalStoragePipeline(
return nil, err
}

var shareMu *sync.Mutex
if bcctx != nil {
shareMu = bcctx.GetLocalBackend().GetMutex()
}

memTotal, err := memory.MemTotal()
if err != nil {
return nil, err
}
memSize := (memTotal / 2) / uint64(len(indexes))
memSize := (memTotal / 2) / uint64(writerCnt) / uint64(len(indexes))

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, shareMu, memSize)
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSize)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, totalRowCount, metricCounter)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -499,7 +490,6 @@ func NewWriteExternalStoreOperator(
srcChunkPool chan *chunk.Chunk,
concurrency int,
onClose external.OnCloseFunc,
shareMu *sync.Mutex,
memoryQuota uint64,
) *WriteExternalStoreOperator {
pool := workerpool.NewWorkerPool(
Expand All @@ -512,7 +502,6 @@ func NewWriteExternalStoreOperator(
builder := external.NewWriterBuilder().
SetOnCloseFunc(onClose).
SetKeyDuplicationEncoding(index.Meta().Unique).
SetMutex(shareMu).
SetMemorySizeLimit(memoryQuota)
writerID := uuid.New().String()
prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,5 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
return NewWriteIndexToExternalStoragePipeline(
opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID,
tbl, r.indexes, start, end, totalRowCount, counter, onClose, r.bc)
tbl, r.indexes, start, end, totalRowCount, counter, onClose)
}
100 changes: 100 additions & 0 deletions pkg/ddl/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"testing"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)

func BenchmarkExtractDatumByOffsets(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")
copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "")
require.NoError(b, err)
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(b, err)
copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
require.NoError(b, err)
require.NoError(b, txn.Rollback())

handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets))

iter := chunk.NewIterator4Chunk(copChunk)
row := iter.Begin()
c := copCtx.GetBase()
offsets := copCtx.IndexColumnOutputOffsets(idxInfo.ID)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ddl.ExtractDatumByOffsetsForTest(row, offsets, c.ExprColumnInfos, handleDataBuf)
}
}

func BenchmarkGenerateIndexKV(b *testing.B) {
store, dom := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint, b int, index idx (b));")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(b, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("idx")

index := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sctx := tk.Session().GetSessionVars().StmtCtx
idxDt := []types.Datum{types.NewIntDatum(10)}
buf := make([]byte, 0, 64)
handle := kv.IntHandle(1)

b.ResetTimer()
for i := 0; i < b.N; i++ {
buf = buf[:0]
iter := index.GenIndexKVIter(sctx, idxDt, handle, nil)
_, _, _, err = iter.Next(buf, nil)
if err != nil {
break
}
}
require.NoError(b, err)
}
1 change: 1 addition & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type JobContext struct {
tp string

resourceGroupName string
cloudStorageURI string
}

// NewJobContext returns a new ddl job context.
Expand Down
Loading

0 comments on commit d3d5972

Please sign in to comment.