diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 93299ec3e0c6b..0280abc26c41a 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "iter.go", "kv_reader.go", "merge.go", + "onefile_writer.go", "split.go", "stat_reader.go", "util.go", @@ -57,13 +58,14 @@ go_test( "file_test.go", "iter_test.go", "merge_test.go", + "onefile_writer_test.go", "split_test.go", "util_test.go", "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 45, + shard_count = 49, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 798a81eae8025..12378cd8707ee 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -166,9 +166,37 @@ func writeExternalFile(s *writeTestSuite) { } } +func writeExternalOneFile(s *writeTestSuite) { + ctx := context.Background() + builder := NewWriterBuilder(). + SetMemorySizeLimit(uint64(s.memoryLimit)) + + if s.beforeCreateWriter != nil { + s.beforeCreateWriter() + } + writer := builder.BuildOneFile( + s.store, "test/external", "writerID") + _ = writer.Init(ctx, 20*1024*1024) + key, val, _ := s.source.next() + for key != nil { + err := writer.WriteRow(ctx, key, val) + intest.AssertNoError(err) + key, val, _ = s.source.next() + } + if s.beforeWriterClose != nil { + s.beforeWriterClose() + } + err := writer.Close(ctx) + intest.AssertNoError(err) + if s.afterWriterClose != nil { + s.afterWriterClose() + } +} + func TestCompareWriter(t *testing.T) { store := openTestingStorage(t) - source := newAscendingKeySource(20, 100, 10000000) + sourceKVNum := 10000000 + source := newAscendingKeySource(20, 100, sourceKVNum) memoryLimit := 64 * 1024 * 1024 fileIdx := 0 var ( @@ -210,10 +238,15 @@ func TestCompareWriter(t *testing.T) { baseSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024 t.Logf("base speed for %d bytes: %.2f MB/s", source.outputSize(), baseSpeed) - suite.source = newAscendingKeySource(20, 100, 10000000) + suite.source = newAscendingKeySource(20, 100, sourceKVNum) writeExternalFile(suite) writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024 t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed) + + suite.source = newAscendingKeySource(20, 100, sourceKVNum) + writeExternalOneFile(suite) + writerSpeed = float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024 + t.Logf("one file writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed) } type readTestSuite struct { diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 900bea084fd2a..4465534053917 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -168,7 +168,8 @@ func (e *Engine) LoadIngestData( zap.Int("concurrency", concurrency), zap.Int("ranges", len(regionRanges)), zap.Int("range-groups", len(rangeGroups)), - zap.Int("data-files", len(e.dataFiles)), + zap.Int("num-data-files", len(e.dataFiles)), + zap.Int("num-stat-files", len(e.statsFiles)), zap.Bool("check-hotspot", e.checkHotspot), ) eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index 768f7bd979a92..547d147132f1d 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -230,8 +230,6 @@ func (i *mergeIter[T, R]) currElem() T { // next forwards the iterator to the next element. It returns false if there is // no available element. func (i *mergeIter[T, R]) next() bool { - var zeroT T - i.curr = zeroT if i.lastReaderIdx >= 0 { if i.checkHotspot { i.hotspotMap[i.lastReaderIdx] = i.hotspotMap[i.lastReaderIdx] + 1 diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index dce4b4aa2959d..a2eb92c924733 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -2,20 +2,22 @@ package external import ( "context" + "errors" "github.com/google/uuid" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" + tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) // MergeOverlappingFiles reads from given files whose key range may overlap // and writes to new sorted, nonoverlapping files. -func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, readBufferSize int, +func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, partSize int64, readBufferSize int, newFilePrefix string, blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64, onClose OnCloseFunc, concurrency int, checkHotspot bool) error { var dataFilesSlice [][]string @@ -37,13 +39,15 @@ func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.Ex zap.Int("concurrency", concurrency)) eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(concurrency) + partSize = max(int64(5*size.MB), partSize+int64(1*size.MB)) for _, files := range dataFilesSlice { files := files eg.Go(func() error { - return mergeOverlappingFilesImpl( + return MergeOverlappingFilesV2( egCtx, files, store, + partSize, readBufferSize, newFilePrefix, uuid.New().String(), @@ -60,6 +64,7 @@ func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.Ex return eg.Wait() } +// unused for now. func mergeOverlappingFilesImpl(ctx context.Context, paths []string, store storage.ExternalStorage, @@ -81,6 +86,69 @@ func mergeOverlappingFilesImpl(ctx context.Context, defer func() { task.End(zap.ErrorLevel, err) }() + + zeroOffsets := make([]uint64, len(paths)) + iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot) + if err != nil { + return err + } + defer func() { + err := iter.Close() + if err != nil { + logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err)) + } + }() + + writer := NewWriterBuilder(). + SetMemorySizeLimit(memSizeLimit). + SetBlockSize(blockSize). + SetOnCloseFunc(onClose). + SetWriterBatchCount(writeBatchCount). + SetPropSizeDistance(propSizeDist). + SetPropKeysDistance(propKeysDist). + Build(store, newFilePrefix, writerID) + + // currently use same goroutine to do read and write. The main advantage is + // there's no KV copy and iter can reuse the buffer. + for iter.Next() { + err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil) + if err != nil { + return err + } + } + err = iter.Error() + if err != nil { + return err + } + return writer.Close(ctx) +} + +// MergeOverlappingFilesV2 reads from given files whose key range may overlap +// and writes to one new sorted, nonoverlapping files. +func MergeOverlappingFilesV2( + ctx context.Context, + paths []string, + store storage.ExternalStorage, + partSize int64, + readBufferSize int, + newFilePrefix string, + writerID string, + memSizeLimit uint64, + blockSize int, + writeBatchCount uint64, + propSizeDist uint64, + propKeysDist uint64, + onClose OnCloseFunc, + checkHotspot bool, +) (err error) { + task := log.BeginTask(logutil.Logger(ctx).With( + zap.String("writer-id", writerID), + zap.Int("file-count", len(paths)), + ), "merge overlapping files") + defer func() { + task.End(zap.ErrorLevel, err) + }() + failpoint.Inject("mergeOverlappingFilesImpl", func(val failpoint.Value) { if val.(string) == paths[0] { failpoint.Return(errors.New("injected error")) @@ -107,16 +175,24 @@ func mergeOverlappingFilesImpl(ctx context.Context, writer := NewWriterBuilder(). SetMemorySizeLimit(memSizeLimit). SetBlockSize(blockSize). - SetOnCloseFunc(onClose). SetWriterBatchCount(writeBatchCount). - SetPropSizeDistance(propSizeDist). SetPropKeysDistance(propKeysDist). - Build(store, newFilePrefix, writerID) + SetPropSizeDistance(propSizeDist). + SetOnCloseFunc(onClose). + BuildOneFile(store, newFilePrefix, writerID) + err = writer.Init(ctx, partSize) + if err != nil { + return nil + } + var minKey, maxKey tidbkv.Key // currently use same goroutine to do read and write. The main advantage is // there's no KV copy and iter can reuse the buffer. for iter.Next() { - err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil) + if len(minKey) == 0 { + minKey = tidbkv.Key(iter.Key()).Clone() + } + err = writer.WriteRow(ctx, iter.Key(), iter.Value()) if err != nil { return err } @@ -125,5 +201,26 @@ func mergeOverlappingFilesImpl(ctx context.Context, if err != nil { return err } - return writer.Close(ctx) + maxKey = tidbkv.Key(iter.Key()).Clone() + + var stat MultipleFilesStat + stat.Filenames = append(stat.Filenames, + [2]string{writer.dataFile, writer.statFile}) + stat.build([]tidbkv.Key{minKey}, []tidbkv.Key{maxKey}) + if onClose != nil { + onClose(&WriterSummary{ + WriterID: writer.writerID, + Seq: 0, + Min: minKey, + Max: maxKey, + TotalSize: writer.totalSize, + MultipleFilesStats: []MultipleFilesStat{stat}, + }) + } + + err = writer.Close(ctx) + if err != nil { + return err + } + return nil } diff --git a/br/pkg/lightning/backend/external/merge_test.go b/br/pkg/lightning/backend/external/merge_test.go index 42078e75ce97b..799121c4fc7fb 100644 --- a/br/pkg/lightning/backend/external/merge_test.go +++ b/br/pkg/lightning/backend/external/merge_test.go @@ -32,6 +32,7 @@ func TestMergeOverlappingFiles(t *testing.T) { []string{"a", "b", "c", "d", "e"}, nil, 1, + 1, "", 1, 1, diff --git a/br/pkg/lightning/backend/external/onefile_writer.go b/br/pkg/lightning/backend/external/onefile_writer.go new file mode 100644 index 0000000000000..b0dea86661744 --- /dev/null +++ b/br/pkg/lightning/backend/external/onefile_writer.go @@ -0,0 +1,158 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "context" + "encoding/binary" + "path/filepath" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/size" + "go.uber.org/zap" +) + +// OneFileWriter is used to write data into external storage +// with only one file for data and stat. +type OneFileWriter struct { + // storage related. + store storage.ExternalStorage + kvStore *KeyValueStore + kvBuffer *membuf.Buffer + + // Statistic information per writer. + totalSize uint64 + rc *rangePropertiesCollector + + // file information. + writerID string + filenamePrefix string + dataFile string + statFile string + dataWriter storage.ExternalFileWriter + statWriter storage.ExternalFileWriter + + onClose OnCloseFunc + closed bool + + logger *zap.Logger +} + +// initWriter inits the underlying dataFile/statFile path, dataWriter/statWriter for OneFileWriter. +func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( + err error, +) { + w.dataFile = filepath.Join(w.filenamePrefix, "one-file") + w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{Concurrency: 20, PartSize: partSize}) + if err != nil { + return err + } + w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file") + w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)}) + if err != nil { + _ = w.dataWriter.Close(ctx) + return err + } + w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile)) + return nil +} + +// Init inits the OneFileWriter and its underlying KeyValueStore. +func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) { + w.logger = logutil.Logger(ctx) + err = w.initWriter(ctx, partSize) + if err != nil { + return err + } + w.kvStore, err = NewKeyValueStore(ctx, w.dataWriter, w.rc) + return err +} + +// WriteRow implements ingest.Writer. +func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error { + // 1. encode data and write to kvStore. + keyLen := len(idxKey) + length := len(idxKey) + len(idxVal) + lengthBytes*2 + buf, _ := w.kvBuffer.AllocBytesWithSliceLocation(length) + if buf == nil { + w.kvBuffer.Reset() + buf, _ = w.kvBuffer.AllocBytesWithSliceLocation(length) + // we now don't support KV larger than blockSize + if buf == nil { + return errors.Errorf("failed to allocate kv buffer: %d", length) + } + // 2. write statistics if one kvBuffer is used. + w.kvStore.Close() + encodedStat := w.rc.encode() + _, err := w.statWriter.Write(ctx, encodedStat) + if err != nil { + return err + } + w.rc.reset() + } + binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) + copy(buf[lengthBytes:], idxKey) + binary.BigEndian.AppendUint64(buf[lengthBytes+keyLen:lengthBytes+keyLen], uint64(len(idxVal))) + copy(buf[lengthBytes*2+keyLen:], idxVal) + w.kvStore.addEncodedData(buf[:length]) + w.totalSize += uint64(keyLen + len(idxVal)) + return nil +} + +// Close closes the writer. +func (w *OneFileWriter) Close(ctx context.Context) error { + if w.closed { + return errors.Errorf("writer %s has been closed", w.writerID) + } + err := w.closeImpl(ctx) + if err != nil { + return err + } + w.logger.Info("close one file writer", + zap.String("writerID", w.writerID)) + + w.totalSize = 0 + w.closed = true + return nil +} + +func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) { + // 1. write remaining statistic. + w.kvStore.Close() + encodedStat := w.rc.encode() + _, err = w.statWriter.Write(ctx, encodedStat) + if err != nil { + return err + } + w.rc.reset() + // 2. close data writer. + err1 := w.dataWriter.Close(ctx) + if err1 != nil { + w.logger.Error("Close data writer failed", zap.Error(err)) + err = err1 + return + } + // 4. close stat writer. + err2 := w.statWriter.Close(ctx) + if err2 != nil { + w.logger.Error("Close stat writer failed", zap.Error(err)) + err = err2 + return + } + return nil +} diff --git a/br/pkg/lightning/backend/external/onefile_writer_test.go b/br/pkg/lightning/backend/external/onefile_writer_test.go new file mode 100644 index 0000000000000..be1bb8a717ad6 --- /dev/null +++ b/br/pkg/lightning/backend/external/onefile_writer_test.go @@ -0,0 +1,359 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "fmt" + "io" + "path" + "slices" + "strconv" + "testing" + "time" + + "github.com/cockroachdb/pebble" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + dbkv "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/util/size" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +func TestOnefileWriterBasic(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + + // 1. write into one file. + // 2. read kv file and check result. + // 3. read stat file and check result. + writer := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + BuildOneFile(memStore, "/test", "0") + + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + + kvCnt := 100 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + randLen := rand.Intn(10) + 1 + kvs[i].Key = make([]byte, randLen) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + randLen = rand.Intn(10) + 1 + kvs[i].Val = make([]byte, randLen) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + + for _, item := range kvs { + err := writer.WriteRow(ctx, item.Key, item.Val) + require.NoError(t, err) + } + + err = writer.Close(ctx) + require.NoError(t, err) + + bufSize := rand.Intn(100) + 1 + kvReader, err := newKVReader(ctx, "/test/0/one-file", memStore, 0, bufSize) + require.NoError(t, err) + for i := 0; i < kvCnt; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + require.Equal(t, kvs[i].Key, key) + require.Equal(t, kvs[i].Val, value) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + require.NoError(t, kvReader.Close()) + + statReader, err := newStatsReader(ctx, memStore, "/test/0_stat/one-file", bufSize) + require.NoError(t, err) + + var keyCnt uint64 = 0 + for { + p, err := statReader.nextProp() + if err == io.EOF { + break + } + require.NoError(t, err) + keyCnt += p.keys + } + require.Equal(t, uint64(kvCnt), keyCnt) + require.NoError(t, statReader.Close()) +} + +func TestOnefileWriterStat(t *testing.T) { + distanceCntArr := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + kvCntArr := []int{10, 100, 200, 1000} // won't large than DefaultMemSizeLimit. + // 1. write into one file. + // 2. read kv file and check result. + // 3. read stat file and check result. + for _, kvCnt := range kvCntArr { + for _, distance := range distanceCntArr { + checkOneFileWriterStatWithDistance(t, kvCnt, distance, DefaultMemSizeLimit, "test"+strconv.Itoa(int(distance))) + } + } +} + +func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance uint64, memSizeLimit uint64, prefix string) { + ctx := context.Background() + memStore := storage.NewMemStorage() + writer := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(keysDistance). + BuildOneFile(memStore, "/"+prefix, "0") + + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + kvs := make([]common.KvPair, 0, kvCnt) + for i := 0; i < kvCnt; i++ { + kvs = append(kvs, common.KvPair{ + Key: []byte(fmt.Sprintf("key%02d", i)), + Val: []byte("56789"), + }) + } + for _, item := range kvs { + err := writer.WriteRow(ctx, item.Key, item.Val) + require.NoError(t, err) + } + err = writer.Close(ctx) + require.NoError(t, err) + + bufSize := rand.Intn(100) + 1 + kvReader, err := newKVReader(ctx, "/"+prefix+"/0/one-file", memStore, 0, bufSize) + require.NoError(t, err) + for i := 0; i < kvCnt; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + require.Equal(t, kvs[i].Key, key) + require.Equal(t, kvs[i].Val, value) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + require.NoError(t, kvReader.Close()) + + statReader, err := newStatsReader(ctx, memStore, "/"+prefix+"/0_stat/one-file", bufSize) + require.NoError(t, err) + + var keyCnt uint64 = 0 + idx := 0 + for { + p, err := statReader.nextProp() + if err == io.EOF { + break + } + require.NoError(t, err) + keyCnt += p.keys + require.Equal(t, kvs[idx].Key, p.firstKey) + lastIdx := idx + int(keysDistance) - 1 + if lastIdx >= len(kvs) { + lastIdx = len(kvs) - 1 + } + require.Equal(t, kvs[lastIdx].Key, p.lastKey) + idx += int(keysDistance) + } + require.Equal(t, uint64(kvCnt), keyCnt) + require.NoError(t, statReader.Close()) +} + +func TestMergeOverlappingFilesV2(t *testing.T) { + // 1. Write to 5 files. + // 2. merge 5 files into one file. + // 3. read one file and check result. + // 4. check duplicate key. + ctx := context.Background() + memStore := storage.NewMemStorage() + writer := NewWriterBuilder(). + SetPropKeysDistance(2). + SetMemorySizeLimit(1000). + SetKeyDuplicationEncoding(true). + Build(memStore, "/test", "0") + + kvCount := 2000000 + for i := 0; i < kvCount; i++ { + v := i + if v == kvCount/2 { + v-- // insert a duplicate key. + } + key, val := []byte{byte(v)}, []byte{byte(v)} + err := writer.WriteRow(ctx, key, val, dbkv.IntHandle(i)) + require.NoError(t, err) + } + err := writer.Close(ctx) + require.NoError(t, err) + + err = MergeOverlappingFilesV2( + ctx, + []string{"/test/0/0", "/test/0/1", "/test/0/2", "/test/0/3", "/test/0/4"}, + memStore, + int64(5*size.MB), + 100, + "/test2", + "mergeID", + 1000, + 1000, + 8*1024, + 1*size.MB, + 2, + nil, + true, + ) + require.NoError(t, err) + + keys := make([][]byte, 0, kvCount) + values := make([][]byte, 0, kvCount) + + kvReader, err := newKVReader(ctx, "/test2/mergeID/one-file", memStore, 0, 100) + require.NoError(t, err) + for i := 0; i < kvCount; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + clonedKey := make([]byte, len(key)) + copy(clonedKey, key) + clonedVal := make([]byte, len(value)) + copy(clonedVal, value) + keys = append(keys, clonedKey) + values = append(values, clonedVal) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + require.NoError(t, kvReader.Close()) + + dir := t.TempDir() + db, err := pebble.Open(path.Join(dir, "duplicate"), nil) + require.NoError(t, err) + keyAdapter := common.DupDetectKeyAdapter{} + data := &MemoryIngestData{ + keyAdapter: keyAdapter, + duplicateDetection: true, + duplicateDB: db, + dupDetectOpt: common.DupDetectOpt{ReportErrOnDup: true}, + keys: keys, + values: values, + ts: 123, + } + pool := membuf.NewPool() + defer pool.Destroy() + iter := data.NewIter(ctx, nil, nil, pool) + + for iter.First(); iter.Valid(); iter.Next() { + } + err = iter.Error() + require.Error(t, err) + require.Contains(t, err.Error(), "found duplicate key") +} + +func TestOnefileWriterManyRows(t *testing.T) { + // 1. write into one file with sorted order. + // 2. merge one file. + // 3. read kv file and check the result. + // 4. check the writeSummary. + ctx := context.Background() + memStore := storage.NewMemStorage() + writer := NewWriterBuilder(). + SetPropKeysDistance(2). + SetMemorySizeLimit(1000). + BuildOneFile(memStore, "/test", "0") + + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + + kvCnt := 100000 + expectedTotalSize := 0 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + randLen := rand.Intn(10) + 1 + kvs[i].Key = make([]byte, randLen) + _, err := rand.Read(kvs[i].Key) + expectedTotalSize += randLen + + require.NoError(t, err) + randLen = rand.Intn(10) + 1 + kvs[i].Val = make([]byte, randLen) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + expectedTotalSize += randLen + } + + slices.SortFunc(kvs, func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + + for _, item := range kvs { + err := writer.WriteRow(ctx, item.Key, item.Val) + require.NoError(t, err) + } + err = writer.Close(ctx) + require.NoError(t, err) + + var resSummary *WriterSummary + onClose := func(summary *WriterSummary) { + resSummary = summary + } + err = MergeOverlappingFilesV2( + ctx, + []string{"/test/0/one-file"}, + memStore, + int64(5*size.MB), + 100, + "/test2", + "mergeID", + 1000, + 1000, + 8*1024, + 1*size.MB, + 2, + onClose, + true, + ) + require.NoError(t, err) + + bufSize := rand.Intn(100) + 1 + kvReader, err := newKVReader(ctx, "/test2/mergeID/one-file", memStore, 0, bufSize) + require.NoError(t, err) + for i := 0; i < kvCnt; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + require.Equal(t, kvs[i].Key, key) + require.Equal(t, kvs[i].Val, value) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + require.NoError(t, kvReader.Close()) + + // check writerSummary. + expected := MultipleFilesStat{ + MinKey: kvs[0].Key, + MaxKey: kvs[len(kvs)-1].Key, + Filenames: [][2]string{ + {"/test2/mergeID/one-file", "/test2/mergeID_stat/one-file"}, + }, + MaxOverlappingNum: 1, + } + require.EqualValues(t, expected.MinKey, resSummary.Min) + require.EqualValues(t, expected.MaxKey, resSummary.Max) + require.Equal(t, expected.Filenames, resSummary.MultipleFilesStats[0].Filenames) + require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum) + require.EqualValues(t, expectedTotalSize, resSummary.TotalSize) +} diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index e1c4494b89f1b..a9597e42b9883 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -20,8 +20,10 @@ import ( "fmt" "slices" "sort" + "strconv" "strings" + "github.com/docker/go-units" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" @@ -62,9 +64,10 @@ func seekPropsOffsets( propKey := kv.Key(p.firstKey) if propKey.Cmp(start) > 0 { if !moved { - return nil, fmt.Errorf("start key %s is too small for stat files %v", + return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s", start.String(), paths, + propKey.String(), ) } return offsets, nil @@ -292,3 +295,13 @@ func BytesMax(a, b []byte) []byte { } return b } + +func getSpeed(n uint64, dur float64, isBytes bool) string { + if dur == 0 { + return "-" + } + if isBytes { + return units.BytesSize(float64(n) / dur) + } + return strconv.FormatFloat(float64(n)/dur, 'f', 4, 64) +} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 1122f0a8b554a..6556da432b784 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -122,6 +122,7 @@ func NewWriterBuilder() *WriterBuilder { // SetMemorySizeLimit sets the memory size limit of the writer. When accumulated // data size exceeds this limit, the writer will flush data as a file to external // storage. +// When the writer is OneFileWriter SetMemorySizeLimit sets the preAllocated memory buffer size. func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { b.memSizeLimit = size return b @@ -201,6 +202,35 @@ func (b *WriterBuilder) Build( fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum), } ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum) + + return ret +} + +// BuildOneFile builds a new one file Writer. The writer will create only one file under the prefix +// of "{prefix}/{writerID}". +func (b *WriterBuilder) BuildOneFile( + store storage.ExternalStorage, + prefix string, + writerID string, +) *OneFileWriter { + filenamePrefix := filepath.Join(prefix, writerID) + p := membuf.NewPool(membuf.WithPoolSize(0), membuf.WithBlockSize(b.blockSize)) + + ret := &OneFileWriter{ + rc: &rangePropertiesCollector{ + props: make([]*rangeProperty, 0, 1024), + currProp: &rangeProperty{}, + propSizeDist: b.propSizeDist, + propKeysDist: b.propKeysDist, + }, + kvBuffer: p.NewBuffer(membuf.WithMemoryLimit(b.memSizeLimit)), + store: store, + filenamePrefix: filenamePrefix, + writerID: writerID, + kvStore: nil, + onClose: b.onClose, + closed: false, + } return ret } @@ -389,15 +419,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { savedBytes = w.batchSize startTs := time.Now() - getSpeed := func(n uint64, dur float64, isBytes bool) string { - if dur == 0 { - return "-" - } - if isBytes { - return units.BytesSize(float64(n) / dur) - } - return units.HumanSize(float64(n) / dur) - } kvCnt := len(w.kvLocations) defer func() { w.currentSeq++ @@ -501,12 +522,12 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( err error, ) { dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) - dataWriter, err := w.store.Create(ctx, dataPath, &storage.WriterOption{Concurrency: 20}) + dataWriter, err := w.store.Create(ctx, dataPath, &storage.WriterOption{Concurrency: 20, PartSize: (int64)(5 * size.MB)}) if err != nil { return "", "", nil, nil, err } statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq)) - statsWriter, err := w.store.Create(ctx, statPath, &storage.WriterOption{Concurrency: 20}) + statsWriter, err := w.store.Create(ctx, statPath, &storage.WriterOption{Concurrency: 20, PartSize: (int64)(5 * size.MB)}) if err != nil { _ = dataWriter.Close(ctx) return "", "", nil, nil, err diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 97f58ec5b89fd..cfb3e8b5bba1a 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -1031,6 +1031,7 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti } } else { up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) { + u.PartSize = option.PartSize u.Concurrency = option.Concurrency u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * hardcodedS3ChunkSize) }) diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 0abecc827414c..348bbd32a5a26 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -90,6 +90,7 @@ type Writer interface { type WriterOption struct { Concurrency int + PartSize int64 } type ReaderOption struct { diff --git a/pkg/ddl/backfilling_dispatcher.go b/pkg/ddl/backfilling_dispatcher.go index 622ef9deab136..d8f9ff561eed0 100644 --- a/pkg/ddl/backfilling_dispatcher.go +++ b/pkg/ddl/backfilling_dispatcher.go @@ -272,7 +272,11 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) } func generateNonPartitionPlan( - d *ddl, tblInfo *model.TableInfo, job *model.Job, useCloud bool, instanceCnt int) (metas [][]byte, err error) { + d *ddl, + tblInfo *model.TableInfo, + job *model.Job, + useCloud bool, + instanceCnt int) (metas [][]byte, err error) { tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo) if err != nil { return nil, err @@ -281,6 +285,7 @@ func generateNonPartitionPlan( if err != nil { return nil, errors.Trace(err) } + startKey, endKey, err := getTableRange(d.jobContext(job.ID, job.ReorgMeta), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority) if startKey == nil && endKey == nil { // Empty table. @@ -387,6 +392,7 @@ func generateGlobalSortIngestPlan( logger.Info("split subtask range", zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + if startKey.Cmp(endKey) >= 0 { return nil, errors.Errorf("invalid range, startKey: %s, endKey: %s", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) diff --git a/pkg/ddl/backfilling_dispatcher_test.go b/pkg/ddl/backfilling_dispatcher_test.go index a7a02231b7fcb..5da673b6625e5 100644 --- a/pkg/ddl/backfilling_dispatcher_test.go +++ b/pkg/ddl/backfilling_dispatcher_test.go @@ -54,6 +54,8 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockWriterMemSize", "return()")) + defer failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockWriterMemSize") /// 1. test partition table. tk.MustExec("create table tp1(id int primary key, v int) PARTITION BY RANGE (id) (\n " + "PARTITION p0 VALUES LESS THAN (10),\n" + diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index a7025ddcf3946..d887bb0a5cf91 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -89,7 +89,7 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl, return newReadIndexExecutor( d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil case proto.StepTwo: - return newMergeSortExecutor(jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) + return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) case proto.StepThree: if len(bgm.CloudStorageURI) > 0 { return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 32d27d7146a06..6de8ee0d7b756 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/intest" @@ -37,7 +36,7 @@ import ( type mergeSortExecutor struct { jobID int64 - index *model.IndexInfo + idxNum int ptbl table.PhysicalTable bc ingest.BackendCtx cloudStoreURI string @@ -47,14 +46,14 @@ type mergeSortExecutor struct { func newMergeSortExecutor( jobID int64, - index *model.IndexInfo, + idxNum int, ptbl table.PhysicalTable, bc ingest.BackendCtx, cloudStoreURI string, ) (*mergeSortExecutor, error) { return &mergeSortExecutor{ jobID: jobID, - index: index, + idxNum: idxNum, ptbl: ptbl, bc: bc, cloudStoreURI: cloudStoreURI, @@ -100,9 +99,23 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID))) - // TODO: config generated by plan. - return external.MergeOverlappingFiles(ctx, sm.DataFiles, store, 64*1024, prefix, - external.DefaultBlockSize, 8*1024, 1*size.MB, 8*1024, onClose, + partSize, err := getMergeSortPartSize(int(variable.GetDDLReorgWorkerCounter()), m.idxNum) + if err != nil { + return err + } + + return external.MergeOverlappingFiles( + ctx, + sm.DataFiles, + store, + int64(partSize), + 64*1024, + prefix, + external.DefaultBlockSize, + 8*1024, + 1*size.MB, + 8*1024, + onClose, int(variable.GetDDLReorgWorkerCounter()), true) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index a766142977243..e5b53c0a369eb 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/size" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -111,6 +112,33 @@ func (ctx *OperatorCtx) OperatorErr() error { return *err } +func getWriterMemSize(idxNum int) (uint64, error) { + failpoint.Inject("mockWriterMemSize", func() { + failpoint.Return(1*size.GB, nil) + }) + _, writerCnt := expectedIngestWorkerCnt() + memTotal, err := memory.MemTotal() + if err != nil { + return 0, err + } + memUsed, err := memory.MemUsed() + if err != nil { + return 0, err + } + memAvailable := memTotal - memUsed + memSize := (memAvailable / 2) / uint64(writerCnt) / uint64(idxNum) + logutil.BgLogger().Info("build operators that write index to cloud storage", zap.Uint64("memory total", memTotal), zap.Uint64("memory used", memUsed), zap.Uint64("memory size", memSize)) + return memSize, nil +} + +func getMergeSortPartSize(concurrency int, idxNum int) (uint64, error) { + writerMemSize, err := getWriterMemSize(idxNum) + if err != nil { + return 0, nil + } + return writerMemSize / uint64(concurrency) / 10, nil +} + // NewAddIndexIngestPipeline creates a pipeline for adding index in ingest mode. func NewAddIndexIngestPipeline( ctx *OperatorCtx, @@ -199,17 +227,10 @@ func NewWriteIndexToExternalStoragePipeline( return nil, err } - memTotal, err := memory.MemTotal() + memSize, err := getWriterMemSize(len(indexes)) if err != nil { return nil, err } - memUsed, err := memory.MemUsed() - if err != nil { - return nil, err - } - memAvailable := memTotal - memUsed - memSize := (memAvailable / 2) / uint64(writerCnt) / uint64(len(indexes)) - logutil.BgLogger().Info("build operators that write index to cloud storage", zap.Uint64("memory total", memTotal), zap.Uint64("memory used", memUsed), zap.Uint64("memory size", memSize)) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 01aac70dac029..f213e56b36443 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -273,6 +273,19 @@ func (r *readIndexExecutor) buildExternalStorePipeline( counter := metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) return NewWriteIndexToExternalStoragePipeline( - opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID, - tbl, r.indexes, start, end, totalRowCount, counter, onClose, r.job.ReorgMeta) + opCtx, + d.store, + r.cloudStorageURI, + r.d.sessPool, + sessCtx, + r.job.ID, + subtaskID, + tbl, + r.indexes, + start, + end, + totalRowCount, + counter, + onClose, + r.job.ReorgMeta) } diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 6645e597d50a4..2afa6ea412960 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -266,6 +266,7 @@ type mergeSortStepExecutor struct { // subtask of a task is run in serial now, so we don't need lock here. // change to SyncMap when we support parallel subtask in the future. subtaskSortedKVMeta *external.SortedKVMeta + partSize int64 } var _ execute.SubtaskExecutor = &mergeSortStepExecutor{} @@ -279,6 +280,8 @@ func (m *mergeSortStepExecutor) Init(ctx context.Context) error { return err } m.controller = controller + // 10000 = max part num + m.partSize = int64(getWriterMemorySizeLimit(&m.taskMeta.Plan) / 10000 * uint64(external.MergeSortOverlapThreshold)) return nil } @@ -305,7 +308,9 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S prefix := subtaskPrefix(m.taskID, subtask.ID) - return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, 64*1024, + logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize)))) + + return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, m.partSize, 64*1024, prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024, onClose, int(m.taskMeta.Plan.ThreadCnt), false) }