Skip to content

Commit

Permalink
external: merge overlapping files concurrently (#48073)
Browse files Browse the repository at this point in the history
close #48072
  • Loading branch information
wjhuang2016 authored Nov 2, 2023
1 parent 07fd1a4 commit 028d7b8
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 43 deletions.
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
Expand Down
66 changes: 60 additions & 6 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,63 @@ package external
import (
"context"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// MergeOverlappingFiles reads from given files whose key range may overlap
// and writes to new sorted, nonoverlapping files.
func MergeOverlappingFiles(
ctx context.Context,
func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, readBufferSize int,
newFilePrefix string, blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64,
onClose OnCloseFunc, concurrency int, checkHotspot bool) error {
var dataFilesSlice [][]string
batchCount := 1
if len(paths) > concurrency {
batchCount = len(paths) / concurrency
}
for i := 0; i < len(paths); i += batchCount {
end := i + batchCount
if end > len(paths) {
end = len(paths)
}
dataFilesSlice = append(dataFilesSlice, paths[i:end])
}

memTotal, err := memory.MemTotal()
if err != nil {
return err
}
memSize := (memTotal / 2) / uint64(len(dataFilesSlice))

var eg errgroup.Group
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
return mergeOverlappingFilesImpl(
ctx,
files,
store,
readBufferSize,
newFilePrefix,
uuid.New().String(),
memSize,
blockSize,
writeBatchCount,
propSizeDist,
propKeysDist,
onClose,
checkHotspot,
)
})
}
return eg.Wait()
}

func mergeOverlappingFilesImpl(ctx context.Context,
paths []string,
store storage.ExternalStorage,
readBufferSize int,
Expand All @@ -28,20 +78,24 @@ func MergeOverlappingFiles(
if err != nil {
return err
}
defer iter.Close()
defer func() {
err := iter.Close()
if err != nil {
logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err))
}
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
SetPropKeysDistance(propKeysDist).
Build(store, newFilePrefix, writerID)

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.

for iter.Next() {
err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestWriterDuplicateDetect(t *testing.T) {
require.NoError(t, err)

// test MergeOverlappingFiles will not change duplicate detection functionality.
err = MergeOverlappingFiles(
err = mergeOverlappingFilesImpl(
ctx,
[]string{"/test/0/0"},
memStore,
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestWriterMultiFileStat(t *testing.T) {
allDataFiles[i] = fmt.Sprintf("/test/0/%d", i)
}

err = MergeOverlappingFiles(
err = mergeOverlappingFilesImpl(
ctx,
allDataFiles,
memStore,
Expand Down
21 changes: 4 additions & 17 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"strconv"
"sync"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -98,25 +98,12 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

writerID := uuid.New().String()
prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID)))

// TODO: config generated by plan.
return external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
store,
64*1024,
prefix,
writerID,
256*size.MB,
external.DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
true,
)
return external.MergeOverlappingFiles(ctx, sm.DataFiles, store, 64*1024, prefix,
external.DefaultBlockSize, 8*1024, 1*size.MB, 8*1024, onClose,
int(variable.GetDDLReorgWorkerCounter()), true)
}

func (*mergeSortExecutor) Cleanup(ctx context.Context) error {
Expand Down
20 changes: 3 additions & 17 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand Down Expand Up @@ -304,24 +303,11 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S
m.subtaskSortedKVMeta.MergeSummary(summary)
}

writerID := uuid.New().String()
prefix := subtaskPrefix(m.taskID, subtask.ID)

return external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
m.controller.GlobalSortStore,
64*1024,
prefix,
writerID,
256*size.MB,
getKVGroupBlockSize(sm.KVGroup),
8*1024,
1*size.MB,
8*1024,
onClose,
false,
)
return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, 64*1024,
prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024,
onClose, int(m.taskMeta.Plan.ThreadCnt), false)
}

func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "addindextest2_test",
timeout = "short",
timeout = "long",
srcs = [
"global_sort_test.go",
"main_test.go",
Expand Down

0 comments on commit 028d7b8

Please sign in to comment.