diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 5db3d4eaffc8d..1bba1a5b87da7 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -5,10 +5,12 @@ go_library( srcs = [ "byte_reader.go", "codec.go", + "engine.go", "file.go", "iter.go", "kv_reader.go", "stat_reader.go", + "util.go", "writer.go", ], importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external", @@ -21,6 +23,7 @@ go_library( "//br/pkg/membuf", "//br/pkg/storage", "//kv", + "//util/hack", "//util/logutil", "//util/mathutil", "//util/size", @@ -36,13 +39,15 @@ go_test( srcs = [ "byte_reader_test.go", "codec_test.go", + "engine_test.go", "file_test.go", "iter_test.go", + "util_test.go", "writer_test.go", ], embed = [":external"], flaky = True, - shard_count = 15, + shard_count = 19, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go new file mode 100644 index 0000000000000..357978f237601 --- /dev/null +++ b/br/pkg/lightning/backend/external/engine.go @@ -0,0 +1,60 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "context" + "encoding/hex" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// Engine stored sorted key/value pairs in an external storage. +type Engine struct { + storage storage.ExternalStorage + dataFiles []string + statsFiles []string +} + +func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) { + logger := logutil.Logger(ctx) + + var offsets []uint64 + if len(e.statsFiles) == 0 { + offsets = make([]uint64, len(e.dataFiles)) + logger.Info("no stats files", + zap.String("startKey", hex.EncodeToString(start))) + } else { + offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage) + if err != nil { + return nil, errors.Trace(err) + } + offsets = offs + logger.Info("seek props offsets", + zap.Uint64s("offsets", offsets), + zap.String("startKey", hex.EncodeToString(start)), + zap.Strings("dataFiles", prettyFileNames(e.dataFiles)), + zap.Strings("statsFiles", prettyFileNames(e.statsFiles))) + } + iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024) + if err != nil { + return nil, errors.Trace(err) + } + return iter, nil +} diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go new file mode 100644 index 0000000000000..b27cb8770d7c8 --- /dev/null +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -0,0 +1,109 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "slices" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +func TestIter(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + + totalKV := 300 + kvPairs := make([]common.KvPair, totalKV) + for i := range kvPairs { + keyBuf := make([]byte, rand.Intn(10)+1) + rand.Read(keyBuf) + // make sure the key is unique + kvPairs[i].Key = append(keyBuf, byte(i/255), byte(i%255)) + valBuf := make([]byte, rand.Intn(10)+1) + rand.Read(valBuf) + kvPairs[i].Val = valBuf + } + + sortedKVPairs := make([]common.KvPair, totalKV) + copy(sortedKVPairs, kvPairs) + slices.SortFunc(sortedKVPairs, func(i, j common.KvPair) int { + return bytes.Compare(i.Key, j.Key) + }) + + ctx := context.Background() + store := storage.NewMemStorage() + + for i := 0; i < 3; i++ { + w := NewWriterBuilder(). + SetMemorySizeLimit(uint64(rand.Intn(100)+1)). + SetPropSizeDistance(uint64(rand.Intn(50)+1)). + SetPropKeysDistance(uint64(rand.Intn(10)+1)). + Build(store, i, "/subtask") + kvStart := i * 100 + kvEnd := (i + 1) * 100 + err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd])) + require.NoError(t, err) + _, err = w.Close(ctx) + require.NoError(t, err) + } + + dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask") + require.NoError(t, err) + + engine := Engine{ + storage: store, + dataFiles: dataFiles, + statsFiles: statFiles, + } + iter, err := engine.createMergeIter(ctx, sortedKVPairs[0].Key) + require.NoError(t, err) + got := make([]common.KvPair, 0, totalKV) + for iter.Next() { + got = append(got, common.KvPair{ + Key: iter.Key(), + Val: iter.Value(), + }) + } + require.NoError(t, iter.Error()) + require.Equal(t, sortedKVPairs, got) + + pickStartIdx := rand.Intn(len(sortedKVPairs)) + startKey := sortedKVPairs[pickStartIdx].Key + iter, err = engine.createMergeIter(ctx, startKey) + require.NoError(t, err) + got = make([]common.KvPair, 0, totalKV) + for iter.Next() { + got = append(got, common.KvPair{ + Key: iter.Key(), + Val: iter.Value(), + }) + } + require.NoError(t, iter.Error()) + // got keys must be ascending + for i := 1; i < len(got); i++ { + require.True(t, bytes.Compare(got[i-1].Key, got[i].Key) < 0) + } + // the first key must be less than or equal to startKey + require.True(t, bytes.Compare(got[0].Key, startKey) <= 0) +} diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index 8d5c3dd2bf6af..685f0495e4a2b 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -17,9 +17,6 @@ package external import ( "context" "encoding/binary" - "path/filepath" - "strconv" - "strings" "github.com/pingcap/tidb/br/pkg/storage" ) @@ -117,77 +114,4 @@ func (s *KeyValueStore) Close() { } } -var statSuffix = filepath.Join("_stat", "0") - -// GetAllFileNames returns a FilePathHandle that contains all data file paths -// and a slice of stat file paths. -func GetAllFileNames( - ctx context.Context, - store storage.ExternalStorage, - subDir string, -) (FilePathHandle, []string, error) { - var dataFilePaths FilePathHandle - var stats []string - - err := store.WalkDir(ctx, - &storage.WalkOption{SubDir: subDir}, - func(path string, size int64) error { - if strings.HasSuffix(path, statSuffix) { - stats = append(stats, path) - } else { - dir, file := filepath.Split(path) - writerID, err := strconv.Atoi(filepath.Base(dir)) - if err != nil { - return err - } - seq, err := strconv.Atoi(file) - if err != nil { - return err - } - dataFilePaths.set(writerID, seq, path) - } - return nil - }) - if err != nil { - return dataFilePaths, nil, err - } - return dataFilePaths, stats, nil -} - -// FilePathHandle handles data file paths under a prefix path. -type FilePathHandle struct { - paths [][]string -} - -func (p *FilePathHandle) set(writerID, seq int, path string) { - if writerID >= len(p.paths) { - p.paths = append(p.paths, make([][]string, writerID-len(p.paths)+1)...) - } - if seq >= len(p.paths[writerID]) { - p.paths[writerID] = append(p.paths[writerID], make([]string, seq-len(p.paths[writerID])+1)...) - } - p.paths[writerID][seq] = path -} - -// Get returns the path of the data file with the given writerID and seq. -func (p *FilePathHandle) Get(writerID, seq int) string { - return p.paths[writerID][seq] -} - -// ForEach applies the given function to each data file path. -func (p *FilePathHandle) ForEach(f func(writerID, seq int, path string)) { - for writerID, paths := range p.paths { - for seq, path := range paths { - f(writerID, seq, path) - } - } -} - -// FlatSlice returns a flat slice of all data file paths. -func (p *FilePathHandle) FlatSlice() []string { - var paths []string - p.ForEach(func(writerID, seq int, path string) { - paths = append(paths, path) - }) - return paths -} +const statSuffix = "_stat" diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index f892783fb6a27..1a7874d4baa54 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -378,6 +378,10 @@ func (i *MergePropIter) prop() *rangeProperty { return i.iter.curr } +func (i *MergePropIter) readerIndex() int { + return i.iter.lastReaderIdx +} + // Close closes the iterator. func (i *MergePropIter) Close() error { return i.iter.close() diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go new file mode 100644 index 0000000000000..505ef41660ee2 --- /dev/null +++ b/br/pkg/lightning/backend/external/util.go @@ -0,0 +1,120 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "bytes" + "context" + "fmt" + "path/filepath" + "sort" + "strings" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// prettyFileNames removes the directory prefix except the last level from the +// file names. +func prettyFileNames(files []string) []string { + names := make([]string, 0, len(files)) + for _, f := range files { + dir, file := filepath.Split(f) + names = append(names, fmt.Sprintf("%s/%s", filepath.Base(dir), file)) + } + return names +} + +// seekPropsOffsets seeks the statistic files to find the largest offset of +// sorted data file offsets such that the key at offset is less than or equal to +// the given start key. +func seekPropsOffsets( + ctx context.Context, + start kv.Key, + paths []string, + exStorage storage.ExternalStorage, +) ([]uint64, error) { + iter, err := NewMergePropIter(ctx, paths, exStorage) + if err != nil { + return nil, err + } + logger := logutil.Logger(ctx) + defer func() { + if err := iter.Close(); err != nil { + logger.Warn("failed to close merge prop iterator", zap.Error(err)) + } + }() + offsets := make([]uint64, len(paths)) + moved := false + for iter.Next() { + p := iter.prop() + propKey := kv.Key(p.key) + if propKey.Cmp(start) > 0 { + if !moved { + return nil, fmt.Errorf("start key %s is too small for stat files %v", + start.String(), + paths, + ) + } + return offsets, nil + } + moved = true + offsets[iter.readerIndex()] = p.offset + } + if iter.Error() != nil { + return nil, iter.Error() + } + return offsets, nil +} + +// GetAllFileNames returns data file paths and stat file paths. Both paths are +// sorted. +func GetAllFileNames( + ctx context.Context, + store storage.ExternalStorage, + subDir string, +) ([]string, []string, error) { + var data []string + var stats []string + + err := store.WalkDir(ctx, + &storage.WalkOption{SubDir: subDir}, + func(path string, size int64) error { + // path example: /subtask/0_stat/0 + + // extract the parent dir + bs := hack.Slice(path) + lastIdx := bytes.LastIndexByte(bs, '/') + secondLastIdx := bytes.LastIndexByte(bs[:lastIdx], '/') + parentDir := path[secondLastIdx+1 : lastIdx] + + if strings.HasSuffix(parentDir, statSuffix) { + stats = append(stats, path) + } else { + data = append(data, path) + } + return nil + }) + if err != nil { + return nil, nil, err + } + // in case the external storage does not guarantee the order of walk + sort.Strings(data) + sort.Strings(stats) + return data, stats, nil +} diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go new file mode 100644 index 0000000000000..622872a6bfe27 --- /dev/null +++ b/br/pkg/lightning/backend/external/util_test.go @@ -0,0 +1,173 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestPrettyFileNames(t *testing.T) { + filenames := []string{ + "/tmp/br/backup/1/1_1.sst", + "/tmp/br/2/1_2.sst", + "/tmp/123/1/1_3", + } + expected := []string{ + "1/1_1.sst", + "2/1_2.sst", + "1/1_3", + } + require.Equal(t, expected, prettyFileNames(filenames)) +} + +func TestSeekPropsOffsets(t *testing.T) { + ctx := context.Background() + store := storage.NewMemStorage() + + rc1 := &rangePropertiesCollector{ + props: []*rangeProperty{ + { + key: []byte("key1"), + offset: 10, + }, + { + key: []byte("key3"), + offset: 30, + }, + { + key: []byte("key5"), + offset: 50, + }, + }, + } + file1 := "/test1" + w1, err := store.Create(ctx, file1, nil) + require.NoError(t, err) + _, err = w1.Write(ctx, rc1.encode()) + require.NoError(t, err) + err = w1.Close(ctx) + require.NoError(t, err) + + rc2 := &rangePropertiesCollector{ + props: []*rangeProperty{ + { + key: []byte("key2"), + offset: 20, + }, + { + key: []byte("key4"), + offset: 40, + }, + }, + } + file2 := "/test2" + w2, err := store.Create(ctx, file2, nil) + require.NoError(t, err) + _, err = w2.Write(ctx, rc2.encode()) + require.NoError(t, err) + err = w2.Close(ctx) + require.NoError(t, err) + + got, err := seekPropsOffsets(ctx, []byte("key2.5"), []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, []uint64{10, 20}, got) + got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, []uint64{30, 20}, got) + got, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store) + require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") + got, err = seekPropsOffsets(ctx, []byte("key1"), []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, []uint64{10, 0}, got) + got, err = seekPropsOffsets(ctx, []byte("key999"), []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, []uint64{50, 40}, got) + + file3 := "/test3" + w3, err := store.Create(ctx, file3, nil) + require.NoError(t, err) + err = w3.Close(ctx) + + file4 := "/test4" + w4, err := store.Create(ctx, file4, nil) + require.NoError(t, err) + _, err = w4.Write(ctx, rc1.encode()) + require.NoError(t, err) + err = w4.Close(ctx) + require.NoError(t, err) + got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2, file3, file4}, store) + require.NoError(t, err) + require.Equal(t, []uint64{30, 20, 0, 30}, got) +} + +func TestGetAllFileNames(t *testing.T) { + ctx := context.Background() + store := storage.NewMemStorage() + w := NewWriterBuilder(). + SetMemorySizeLimit(20). + SetPropSizeDistance(5). + SetPropKeysDistance(3). + Build(store, 0, "/subtask") + kvPairs := make([]common.KvPair, 0, 30) + for i := 0; i < 30; i++ { + kvPairs = append(kvPairs, common.KvPair{ + Key: []byte{byte(i)}, + Val: []byte{byte(i)}, + }) + } + err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs)) + require.NoError(t, err) + _, err = w.Close(ctx) + require.NoError(t, err) + + w2 := NewWriterBuilder(). + SetMemorySizeLimit(20). + SetPropSizeDistance(5). + SetPropKeysDistance(3). + Build(store, 3, "/subtask") + err = w2.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs)) + require.NoError(t, err) + _, err = w2.Close(ctx) + require.NoError(t, err) + + w3 := NewWriterBuilder(). + SetMemorySizeLimit(20). + SetPropSizeDistance(5). + SetPropKeysDistance(3). + Build(store, 12, "/subtask") + err = w3.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs)) + require.NoError(t, err) + _, err = w3.Close(ctx) + require.NoError(t, err) + + dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask") + require.NoError(t, err) + require.Equal(t, []string{ + "/subtask/0_stat/0", "/subtask/0_stat/1", "/subtask/0_stat/2", + "/subtask/12_stat/0", "/subtask/12_stat/1", "/subtask/12_stat/2", + "/subtask/3_stat/0", "/subtask/3_stat/1", "/subtask/3_stat/2", + }, statFiles) + require.Equal(t, []string{ + "/subtask/0/0", "/subtask/0/1", "/subtask/0/2", + "/subtask/12/0", "/subtask/12/1", "/subtask/12/2", + "/subtask/3/0", "/subtask/3/1", "/subtask/3/2", + }, dataFiles) +} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 79c6349953ba1..6fac1f3ea5371 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -19,11 +19,10 @@ import ( "context" "encoding/hex" "path/filepath" + "slices" "strconv" "time" - "slices" - "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" @@ -95,7 +94,9 @@ func NewWriterBuilder() *WriterBuilder { } } -// SetMemorySizeLimit sets the memory size limit of the writer. +// SetMemorySizeLimit sets the memory size limit of the writer. When accumulated +// data size exceeds this limit, the writer will flush data as a file to external +// storage. func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { b.memSizeLimit = size return b @@ -131,16 +132,18 @@ func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { return b } -// Build builds a new Writer. +// Build builds a new Writer. The files writer will create are under the prefix +// of "{prefix}/{writerID}". func (b *WriterBuilder) Build( store storage.ExternalStorage, writerID int, - filenamePrefix string, + prefix string, ) *Writer { bp := b.bufferPool if bp == nil { bp = membuf.NewPool() } + filenamePrefix := filepath.Join(prefix, strconv.Itoa(writerID)) return &Writer{ rc: &rangePropertiesCollector{ props: make([]*rangeProperty, 0, 1024), @@ -262,6 +265,7 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { return nil } + logger := logutil.Logger(ctx) dataWriter, statWriter, err := w.createStorageWriter(ctx) if err != nil { return err @@ -277,16 +281,16 @@ func (w *Writer) flushKVs(ctx context.Context) (err error) { return } if err1 != nil { - logutil.Logger(ctx).Error("close data writer failed", zap.Error(err)) + logger.Error("close data writer failed", zap.Error(err)) err = err1 return } if err2 != nil { - logutil.Logger(ctx).Error("close stat writer failed", zap.Error(err)) + logger.Error("close stat writer failed", zap.Error(err)) err = err2 return } - logutil.Logger(ctx).Info("flush kv", + logger.Info("flush kv", zap.Duration("time", time.Since(ts)), zap.Uint64("bytes", savedBytes), zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds())) @@ -332,7 +336,7 @@ func (w *Writer) createStorageWriter(ctx context.Context) (data, stats storage.E if err != nil { return nil, nil, err } - statPath := filepath.Join(w.filenamePrefix+"_stat", strconv.Itoa(w.currentSeq)) + statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq)) statsWriter, err := w.store.Create(ctx, statPath, nil) if err != nil { return nil, nil, err diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index b540b82898516..d12afc3562ff9 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -67,7 +67,7 @@ func TestWriter(t *testing.T) { }) bufSize := rand.Intn(100) + 1 - kvReader, err := newKVReader(ctx, "/test/0", memStore, 0, bufSize) + kvReader, err := newKVReader(ctx, "/test/0/0", memStore, 0, bufSize) require.NoError(t, err) for i := 0; i < kvCnt; i++ { key, value, err := kvReader.nextKV() @@ -78,7 +78,7 @@ func TestWriter(t *testing.T) { _, _, err = kvReader.nextKV() require.Equal(t, io.EOF, err) - statReader, err := newStatsReader(ctx, memStore, "/test_stat/0", bufSize) + statReader, err := newStatsReader(ctx, memStore, "/test/0_stat/0", bufSize) require.NoError(t, err) var keyCnt uint64 = 0 @@ -136,7 +136,7 @@ func TestWriterFlushMultiFileNames(t *testing.T) { require.Len(t, dataFiles, 4) require.Len(t, statFiles, 4) for i := 0; i < 4; i++ { - require.Equal(t, dataFiles[i], fmt.Sprintf("/test/%d", i)) - require.Equal(t, statFiles[i], fmt.Sprintf("/test_stat/%d", i)) + require.Equal(t, dataFiles[i], fmt.Sprintf("/test/0/%d", i)) + require.Equal(t, statFiles[i], fmt.Sprintf("/test/0_stat/%d", i)) } }