From d426bcdad9a124267e18d158aecd07fd3abdf3c0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 17:18:30 +0800 Subject: [PATCH] br/lightning: add kv writer for external backend (#46042) ref pingcap/tidb#45719 --- br/pkg/lightning/backend/external/BUILD.bazel | 15 +- br/pkg/lightning/backend/external/file.go | 12 +- .../lightning/backend/external/file_test.go | 26 +- .../lightning/backend/external/iter_test.go | 16 +- .../lightning/backend/external/sharedisk.go | 36 -- br/pkg/lightning/backend/external/writer.go | 341 ++++++++++++++++++ .../lightning/backend/external/writer_test.go | 142 ++++++++ 7 files changed, 533 insertions(+), 55 deletions(-) delete mode 100644 br/pkg/lightning/backend/external/sharedisk.go create mode 100644 br/pkg/lightning/backend/external/writer.go create mode 100644 br/pkg/lightning/backend/external/writer_test.go diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index b77d624a1c2f9..5db3d4eaffc8d 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -8,15 +8,22 @@ go_library( "file.go", "iter.go", "kv_reader.go", - "sharedisk.go", "stat_reader.go", + "writer.go", ], importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/common", + "//br/pkg/membuf", "//br/pkg/storage", + "//kv", "//util/logutil", "//util/mathutil", + "//util/size", "@com_github_pingcap_errors//:errors", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", @@ -31,15 +38,19 @@ go_test( "codec_test.go", "file_test.go", "iter_test.go", + "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 13, + shard_count = 15, deps = [ + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/common", "//br/pkg/storage", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@org_golang_x_exp//rand", + "@org_golang_x_exp//slices", "@org_uber_go_atomic//:atomic", ], ) diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index 40f523512544a..8d5c3dd2bf6af 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -95,8 +95,8 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error { s.rc.currProp.size += uint64(len(key) + len(value)) s.rc.currProp.keys++ - if s.rc.currProp.size >= s.rc.propSizeIdxDistance || - s.rc.currProp.keys >= s.rc.propKeysIdxDistance { + if s.rc.currProp.size >= s.rc.propSizeDist || + s.rc.currProp.keys >= s.rc.propKeysDist { newProp := *s.rc.currProp s.rc.props = append(s.rc.props, &newProp) @@ -109,6 +109,14 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error { return nil } +// Close closes the KeyValueStore and append the last range property. +func (s *KeyValueStore) Close() { + if s.rc.currProp.keys > 0 { + newProp := *s.rc.currProp + s.rc.props = append(s.rc.props, &newProp) + } +} + var statSuffix = filepath.Join("_stat", "0") // GetAllFileNames returns a FilePathHandle that contains all data file paths diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index fa2b5ccbd79c6..352a94682de17 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -31,8 +31,8 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { writer, err := memStore.Create(ctx, "/test", nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() initRC := *rc @@ -49,7 +49,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { // when not accumulated enough data, no range property will be added. require.Equal(t, &initRC, rc) - // propKeysIdxDistance = 2, so after adding 2 keys, a new range property will be added. + // propKeysDist = 2, so after adding 2 keys, a new range property will be added. k2, v2 := []byte("key2"), []byte("value2") err = kvStore.AddKeyValue(k2, v2) require.NoError(t, err) @@ -72,12 +72,21 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { err = writer.Close(ctx) require.NoError(t, err) + kvStore.Close() + expected = &rangeProperty{ + key: k3, + offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16), + size: uint64(len(k3) + len(v3)), + keys: 1, + } + require.Len(t, rc.props, 2) + require.Equal(t, expected, rc.props[1]) writer, err = memStore.Create(ctx, "/test2", nil) require.NoError(t, err) rc = &rangePropertiesCollector{ - propSizeIdxDistance: 1, - propKeysIdxDistance: 100, + propSizeDist: 1, + propKeysDist: 100, } rc.reset() kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2) @@ -103,6 +112,9 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { keys: 1, } require.Equal(t, expected, rc.props[1]) + kvStore.Close() + // Length of properties should not change after close. + require.Len(t, rc.props, 2) err = writer.Close(ctx) require.NoError(t, err) } @@ -116,8 +128,8 @@ func TestKVReadWrite(t *testing.T) { writer, err := memStore.Create(ctx, "/test", nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index 1698c76831307..83d40f1206e7b 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -66,8 +66,8 @@ func TestMergeKVIter(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -118,8 +118,8 @@ func TestOneUpstream(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -196,8 +196,8 @@ func TestCorruptContent(t *testing.T) { writer, err := memStore.Create(ctx, filename, nil) require.NoError(t, err) rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) @@ -249,8 +249,8 @@ func generateMockFileReader() *kvReader { panic(err) } rc := &rangePropertiesCollector{ - propSizeIdxDistance: 100, - propKeysIdxDistance: 2, + propSizeDist: 100, + propKeysDist: 2, } rc.reset() kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) diff --git a/br/pkg/lightning/backend/external/sharedisk.go b/br/pkg/lightning/backend/external/sharedisk.go deleted file mode 100644 index 5da36bb9f520e..0000000000000 --- a/br/pkg/lightning/backend/external/sharedisk.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package external - -// rangePropertiesCollector collects range properties for each range. The zero -// value of rangePropertiesCollector is not ready to use, should call reset() -// first. -type rangePropertiesCollector struct { - props []*rangeProperty - currProp *rangeProperty - propSizeIdxDistance uint64 - propKeysIdxDistance uint64 -} - -func (rc *rangePropertiesCollector) reset() { - rc.props = rc.props[:0] - rc.currProp = &rangeProperty{} -} - -// encode encodes rc.props to a byte slice. -func (rc *rangePropertiesCollector) encode() []byte { - b := make([]byte, 0, 1024) - return encodeMultiProps(b, rc.props) -} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go new file mode 100644 index 0000000000000..79c6349953ba1 --- /dev/null +++ b/br/pkg/lightning/backend/external/writer.go @@ -0,0 +1,341 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "encoding/hex" + "path/filepath" + "strconv" + "time" + + "slices" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/membuf" + "github.com/pingcap/tidb/br/pkg/storage" + tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/size" + "go.uber.org/zap" +) + +// rangePropertiesCollector collects range properties for each range. The zero +// value of rangePropertiesCollector is not ready to use, should call reset() +// first. +type rangePropertiesCollector struct { + props []*rangeProperty + currProp *rangeProperty + propSizeDist uint64 + propKeysDist uint64 +} + +func (rc *rangePropertiesCollector) reset() { + rc.props = rc.props[:0] + rc.currProp = &rangeProperty{} +} + +// encode encodes rc.props to a byte slice. +func (rc *rangePropertiesCollector) encode() []byte { + b := make([]byte, 0, 1024) + return encodeMultiProps(b, rc.props) +} + +// WriterSummary is the summary of a writer. +type WriterSummary struct { + WriterID int + Seq int + Min tidbkv.Key + Max tidbkv.Key + TotalSize uint64 +} + +// OnCloseFunc is the callback function when a writer is closed. +type OnCloseFunc func(summary *WriterSummary) + +// DummyOnCloseFunc is a dummy OnCloseFunc. +func DummyOnCloseFunc(*WriterSummary) {} + +// WriterBuilder builds a new Writer. +type WriterBuilder struct { + memSizeLimit uint64 + writeBatchCount uint64 + propSizeDist uint64 + propKeysDist uint64 + onClose OnCloseFunc + + bufferPool *membuf.Pool +} + +// NewWriterBuilder creates a WriterBuilder. +func NewWriterBuilder() *WriterBuilder { + return &WriterBuilder{ + memSizeLimit: 256 * size.MB, + writeBatchCount: 8 * 1024, + propSizeDist: 1 * size.MB, + propKeysDist: 8 * 1024, + onClose: DummyOnCloseFunc, + } +} + +// SetMemorySizeLimit sets the memory size limit of the writer. +func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { + b.memSizeLimit = size + return b +} + +// SetWriterBatchCount sets the batch count of the writer. +func (b *WriterBuilder) SetWriterBatchCount(count uint64) *WriterBuilder { + b.writeBatchCount = count + return b +} + +// SetPropSizeDistance sets the distance of range size for each property. +func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder { + b.propSizeDist = dist + return b +} + +// SetPropKeysDistance sets the distance of range keys for each property. +func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder { + b.propKeysDist = dist + return b +} + +// SetOnCloseFunc sets the callback function when a writer is closed. +func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder { + b.onClose = onClose + return b +} + +// SetBufferPool sets the buffer pool of the writer. +func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { + b.bufferPool = bufferPool + return b +} + +// Build builds a new Writer. +func (b *WriterBuilder) Build( + store storage.ExternalStorage, + writerID int, + filenamePrefix string, +) *Writer { + bp := b.bufferPool + if bp == nil { + bp = membuf.NewPool() + } + return &Writer{ + rc: &rangePropertiesCollector{ + props: make([]*rangeProperty, 0, 1024), + currProp: &rangeProperty{}, + propSizeDist: b.propSizeDist, + propKeysDist: b.propKeysDist, + }, + memSizeLimit: b.memSizeLimit, + store: store, + kvBuffer: bp.NewBuffer(), + writeBatch: make([]common.KvPair, 0, b.writeBatchCount), + currentSeq: 0, + filenamePrefix: filenamePrefix, + writerID: writerID, + kvStore: nil, + onClose: b.onClose, + closed: false, + } +} + +// Writer is used to write data into external storage. +type Writer struct { + store storage.ExternalStorage + writerID int + currentSeq int + filenamePrefix string + + kvStore *KeyValueStore + rc *rangePropertiesCollector + + memSizeLimit uint64 + + kvBuffer *membuf.Buffer + writeBatch []common.KvPair + + onClose OnCloseFunc + closed bool + + // Statistic information per batch. + batchSize uint64 + + // Statistic information per writer. + minKey tidbkv.Key + maxKey tidbkv.Key + totalSize uint64 +} + +// AppendRows appends rows to the external storage. +// Note that this method is NOT thread-safe. +func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error { + kvs := kv.Rows2KvPairs(rows) + for _, pair := range kvs { + w.batchSize += uint64(len(pair.Key) + len(pair.Val)) + key := w.kvBuffer.AddBytes(pair.Key) + val := w.kvBuffer.AddBytes(pair.Val) + w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val}) + if w.batchSize >= w.memSizeLimit { + if err := w.flushKVs(ctx); err != nil { + return err + } + } + } + return nil +} + +// IsSynced implements the backend.EngineWriter interface. +func (w *Writer) IsSynced() bool { + return false +} + +// Close closes the writer. +func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) { + if w.closed { + return status(false), errors.Errorf("writer %d has been closed", w.writerID) + } + w.closed = true + defer w.kvBuffer.Destroy() + err := w.flushKVs(ctx) + if err != nil { + return status(false), err + } + + logutil.Logger(ctx).Info("close writer", + zap.Int("writerID", w.writerID), + zap.String("minKey", hex.EncodeToString(w.minKey)), + zap.String("maxKey", hex.EncodeToString(w.maxKey))) + + w.writeBatch = nil + + w.onClose(&WriterSummary{ + WriterID: w.writerID, + Seq: w.currentSeq, + Min: w.minKey, + Max: w.maxKey, + TotalSize: w.totalSize, + }) + return status(true), nil +} + +func (w *Writer) recordMinMax(newMin, newMax tidbkv.Key, size uint64) { + if len(w.minKey) == 0 || newMin.Cmp(w.minKey) < 0 { + w.minKey = newMin.Clone() + } + if len(w.maxKey) == 0 || newMax.Cmp(w.maxKey) > 0 { + w.maxKey = newMax.Clone() + } + w.totalSize += size +} + +type status bool + +// Flushed implements the backend.ChunkFlushStatus interface. +func (s status) Flushed() bool { + return bool(s) +} + +func (w *Writer) flushKVs(ctx context.Context) (err error) { + if len(w.writeBatch) == 0 { + return nil + } + + dataWriter, statWriter, err := w.createStorageWriter(ctx) + if err != nil { + return err + } + + ts := time.Now() + var savedBytes uint64 + + defer func() { + w.currentSeq++ + err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx) + if err != nil { + return + } + if err1 != nil { + logutil.Logger(ctx).Error("close data writer failed", zap.Error(err)) + err = err1 + return + } + if err2 != nil { + logutil.Logger(ctx).Error("close stat writer failed", zap.Error(err)) + err = err2 + return + } + logutil.Logger(ctx).Info("flush kv", + zap.Duration("time", time.Since(ts)), + zap.Uint64("bytes", savedBytes), + zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds())) + }() + + slices.SortFunc(w.writeBatch[:], func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + + w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc, w.writerID, w.currentSeq) + if err != nil { + return err + } + + var kvSize uint64 + for _, pair := range w.writeBatch { + err = w.kvStore.AddKeyValue(pair.Key, pair.Val) + if err != nil { + return err + } + kvSize += uint64(len(pair.Key)) + uint64(len(pair.Val)) + } + + w.kvStore.Close() + _, err = statWriter.Write(ctx, w.rc.encode()) + if err != nil { + return err + } + + w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, kvSize) + + w.writeBatch = w.writeBatch[:0] + w.rc.reset() + w.kvBuffer.Reset() + savedBytes = w.batchSize + w.batchSize = 0 + return nil +} + +func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.ExternalFileWriter, err error) { + dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) + dataWriter, err := w.store.Create(ctx, dataPath, nil) + if err != nil { + return nil, nil, err + } + statPath := filepath.Join(w.filenamePrefix+"_stat", strconv.Itoa(w.currentSeq)) + statsWriter, err := w.store.Create(ctx, statPath, nil) + if err != nil { + return nil, nil, err + } + return dataWriter, statsWriter, nil +} diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go new file mode 100644 index 0000000000000..b540b82898516 --- /dev/null +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -0,0 +1,142 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + "golang.org/x/exp/slices" +) + +func TestWriter(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + + writer := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + Build(memStore, 0, "/test") + + kvCnt := rand.Intn(10) + 10 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + randLen := rand.Intn(10) + 1 + kvs[i].Key = make([]byte, randLen) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + randLen = rand.Intn(10) + 1 + kvs[i].Val = make([]byte, randLen) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + rows := kv.MakeRowsFromKvPairs(kvs) + err := writer.AppendRows(ctx, nil, rows) + require.NoError(t, err) + + _, err = writer.Close(ctx) + require.NoError(t, err) + + slices.SortFunc(kvs, func(i, j common.KvPair) bool { + return bytes.Compare(i.Key, j.Key) < 0 + }) + + bufSize := rand.Intn(100) + 1 + kvReader, err := newKVReader(ctx, "/test/0", memStore, 0, bufSize) + require.NoError(t, err) + for i := 0; i < kvCnt; i++ { + key, value, err := kvReader.nextKV() + require.NoError(t, err) + require.Equal(t, kvs[i].Key, key) + require.Equal(t, kvs[i].Val, value) + } + _, _, err = kvReader.nextKV() + require.Equal(t, io.EOF, err) + + statReader, err := newStatsReader(ctx, memStore, "/test_stat/0", bufSize) + require.NoError(t, err) + + var keyCnt uint64 = 0 + for { + p, err := statReader.nextProp() + if err == io.EOF { + break + } + require.NoError(t, err) + keyCnt += p.keys + } + require.Equal(t, uint64(kvCnt), keyCnt) +} + +func TestWriterFlushMultiFileNames(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + + writer := NewWriterBuilder(). + SetPropKeysDistance(2). + SetMemorySizeLimit(60). + Build(memStore, 0, "/test") + + // 200 bytes key values. + kvCnt := 10 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + kvs[i].Key = make([]byte, 10) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + kvs[i].Val = make([]byte, 10) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + rows := kv.MakeRowsFromKvPairs(kvs) + err := writer.AppendRows(ctx, nil, rows) + require.NoError(t, err) + + _, err = writer.Close(ctx) + require.NoError(t, err) + + var dataFiles, statFiles []string + err = memStore.WalkDir(ctx, &storage.WalkOption{SubDir: "/test"}, func(path string, size int64) error { + if strings.Contains(path, "_stat") { + statFiles = append(statFiles, path) + } else { + dataFiles = append(dataFiles, path) + } + return nil + }) + require.NoError(t, err) + require.Len(t, dataFiles, 4) + require.Len(t, statFiles, 4) + for i := 0; i < 4; i++ { + require.Equal(t, dataFiles[i], fmt.Sprintf("/test/%d", i)) + require.Equal(t, statFiles[i], fmt.Sprintf("/test_stat/%d", i)) + } +}