Skip to content

Commit

Permalink
global sort: merge to one file (#48142)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
ywqzzy authored Nov 23, 2023
1 parent a85ba6f commit 9cf638f
Show file tree
Hide file tree
Showing 19 changed files with 791 additions and 46 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"iter.go",
"kv_reader.go",
"merge.go",
"onefile_writer.go",
"split.go",
"stat_reader.go",
"util.go",
Expand Down Expand Up @@ -57,13 +58,14 @@ go_test(
"file_test.go",
"iter_test.go",
"merge_test.go",
"onefile_writer_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 45,
shard_count = 49,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
37 changes: 35 additions & 2 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,37 @@ func writeExternalFile(s *writeTestSuite) {
}
}

func writeExternalOneFile(s *writeTestSuite) {
ctx := context.Background()
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(s.memoryLimit))

if s.beforeCreateWriter != nil {
s.beforeCreateWriter()
}
writer := builder.BuildOneFile(
s.store, "test/external", "writerID")
_ = writer.Init(ctx, 20*1024*1024)
key, val, _ := s.source.next()
for key != nil {
err := writer.WriteRow(ctx, key, val)
intest.AssertNoError(err)
key, val, _ = s.source.next()
}
if s.beforeWriterClose != nil {
s.beforeWriterClose()
}
err := writer.Close(ctx)
intest.AssertNoError(err)
if s.afterWriterClose != nil {
s.afterWriterClose()
}
}

func TestCompareWriter(t *testing.T) {
store := openTestingStorage(t)
source := newAscendingKeySource(20, 100, 10000000)
sourceKVNum := 10000000
source := newAscendingKeySource(20, 100, sourceKVNum)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
Expand Down Expand Up @@ -210,10 +238,15 @@ func TestCompareWriter(t *testing.T) {
baseSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("base speed for %d bytes: %.2f MB/s", source.outputSize(), baseSpeed)

suite.source = newAscendingKeySource(20, 100, 10000000)
suite.source = newAscendingKeySource(20, 100, sourceKVNum)
writeExternalFile(suite)
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)

suite.source = newAscendingKeySource(20, 100, sourceKVNum)
writeExternalOneFile(suite)
writerSpeed = float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("one file writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
}

type readTestSuite struct {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (e *Engine) LoadIngestData(
zap.Int("concurrency", concurrency),
zap.Int("ranges", len(regionRanges)),
zap.Int("range-groups", len(rangeGroups)),
zap.Int("data-files", len(e.dataFiles)),
zap.Int("num-data-files", len(e.dataFiles)),
zap.Int("num-stat-files", len(e.statsFiles)),
zap.Bool("check-hotspot", e.checkHotspot),
)
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (i *mergeIter[T, R]) currElem() T {
// next forwards the iterator to the next element. It returns false if there is
// no available element.
func (i *mergeIter[T, R]) next() bool {
var zeroT T
i.curr = zeroT
if i.lastReaderIdx >= 0 {
if i.checkHotspot {
i.hotspotMap[i.lastReaderIdx] = i.hotspotMap[i.lastReaderIdx] + 1
Expand Down
113 changes: 105 additions & 8 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package external

import (
"context"
"errors"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// MergeOverlappingFiles reads from given files whose key range may overlap
// and writes to new sorted, nonoverlapping files.
func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, readBufferSize int,
func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, partSize int64, readBufferSize int,
newFilePrefix string, blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64,
onClose OnCloseFunc, concurrency int, checkHotspot bool) error {
var dataFilesSlice [][]string
Expand All @@ -37,13 +39,15 @@ func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.Ex
zap.Int("concurrency", concurrency))
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
partSize = max(int64(5*size.MB), partSize+int64(1*size.MB))
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
return mergeOverlappingFilesImpl(
return MergeOverlappingFilesV2(
egCtx,
files,
store,
partSize,
readBufferSize,
newFilePrefix,
uuid.New().String(),
Expand All @@ -60,6 +64,7 @@ func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.Ex
return eg.Wait()
}

// unused for now.
func mergeOverlappingFilesImpl(ctx context.Context,
paths []string,
store storage.ExternalStorage,
Expand All @@ -81,6 +86,69 @@ func mergeOverlappingFilesImpl(ctx context.Context,
defer func() {
task.End(zap.ErrorLevel, err)
}()

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot)
if err != nil {
return err
}
defer func() {
err := iter.Close()
if err != nil {
logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err))
}
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
SetWriterBatchCount(writeBatchCount).
SetPropSizeDistance(propSizeDist).
SetPropKeysDistance(propKeysDist).
Build(store, newFilePrefix, writerID)

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.
for iter.Next() {
err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil)
if err != nil {
return err
}
}
err = iter.Error()
if err != nil {
return err
}
return writer.Close(ctx)
}

// MergeOverlappingFilesV2 reads from given files whose key range may overlap
// and writes to one new sorted, nonoverlapping files.
func MergeOverlappingFilesV2(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
partSize int64,
readBufferSize int,
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
task := log.BeginTask(logutil.Logger(ctx).With(
zap.String("writer-id", writerID),
zap.Int("file-count", len(paths)),
), "merge overlapping files")
defer func() {
task.End(zap.ErrorLevel, err)
}()

failpoint.Inject("mergeOverlappingFilesImpl", func(val failpoint.Value) {
if val.(string) == paths[0] {
failpoint.Return(errors.New("injected error"))
Expand All @@ -107,16 +175,24 @@ func mergeOverlappingFilesImpl(ctx context.Context,
writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
SetWriterBatchCount(writeBatchCount).
SetPropSizeDistance(propSizeDist).
SetPropKeysDistance(propKeysDist).
Build(store, newFilePrefix, writerID)
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
if err != nil {
return nil
}
var minKey, maxKey tidbkv.Key

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.
for iter.Next() {
err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil)
if len(minKey) == 0 {
minKey = tidbkv.Key(iter.Key()).Clone()
}
err = writer.WriteRow(ctx, iter.Key(), iter.Value())
if err != nil {
return err
}
Expand All @@ -125,5 +201,26 @@ func mergeOverlappingFilesImpl(ctx context.Context,
if err != nil {
return err
}
return writer.Close(ctx)
maxKey = tidbkv.Key(iter.Key()).Clone()

var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{writer.dataFile, writer.statFile})
stat.build([]tidbkv.Key{minKey}, []tidbkv.Key{maxKey})
if onClose != nil {
onClose(&WriterSummary{
WriterID: writer.writerID,
Seq: 0,
Min: minKey,
Max: maxKey,
TotalSize: writer.totalSize,
MultipleFilesStats: []MultipleFilesStat{stat},
})
}

err = writer.Close(ctx)
if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestMergeOverlappingFiles(t *testing.T) {
[]string{"a", "b", "c", "d", "e"},
nil,
1,
1,
"",
1,
1,
Expand Down
Loading

0 comments on commit 9cf638f

Please sign in to comment.