diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 0280abc26c41a..7761221e3add8 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -65,7 +65,7 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 49, + shard_count = 47, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 548c8a7974d22..26e3dbd2e7d6c 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -345,6 +345,7 @@ func createEvenlyDistributedFiles( kvCnt := 0 for i := 0; i < fileCount; i++ { builder := NewWriterBuilder(). + SetBlockSize(10 * 1024 * 1024). SetMemorySizeLimit(uint64(float64(fileSize) * 1.1)) writer := builder.Build( store, diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index 1293f6e0336f5..ce87741640f55 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -18,16 +18,18 @@ import ( "context" "io" + "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" ) var ( // ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per // concurrency. - ConcurrentReaderBufferSizePerConc = 4 * 1024 * 1024 + ConcurrentReaderBufferSizePerConc = int(4 * size.MB) // ConcurrentReaderConcurrency is the concurrency for concurrent reader. ConcurrentReaderConcurrency = 8 ) @@ -44,8 +46,6 @@ type byteReader struct { curBufOffset int smallBuf []byte - retPointers []*[]byte - concurrentReader struct { largeBufferPool *membuf.Buffer store storage.ExternalStorage @@ -193,24 +193,23 @@ func (r *byteReader) switchToConcurrentReader() error { return nil } -// readNBytes reads the next n bytes from the reader and returns a buffer slice containing those bytes. -// The returned slice (pointer) can not be used after r.reset. In the same interval of r.reset, -// byteReader guarantees that the returned slice (pointer) will point to the same content -// though the slice may be changed. -func (r *byteReader) readNBytes(n int) (*[]byte, error) { +// readNBytes reads the next n bytes from the reader and returns a buffer slice +// containing those bytes. The content of returned slice may be changed after +// next call. +func (r *byteReader) readNBytes(n int) ([]byte, error) { b := r.next(n) readLen := len(b) if readLen == n { - ret := &b - r.retPointers = append(r.retPointers, ret) - return ret, nil + return b, nil } // If the reader has fewer than n bytes remaining in current buffer, // `auxBuf` is used as a container instead. + if n > int(size.GB) { + return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB) + } auxBuf := make([]byte, n) copy(auxBuf, b) for readLen < n { - r.cloneSlices() err := r.reload() switch err { case nil: @@ -226,24 +225,7 @@ func (r *byteReader) readNBytes(n int) (*[]byte, error) { copy(auxBuf[readLen:], b) readLen += len(b) } - return &auxBuf, nil -} - -func (r *byteReader) reset() { - for i := range r.retPointers { - r.retPointers[i] = nil - } - r.retPointers = r.retPointers[:0] -} - -func (r *byteReader) cloneSlices() { - for i := range r.retPointers { - copied := make([]byte, len(*r.retPointers[i])) - copy(copied, *r.retPointers[i]) - *r.retPointers[i] = copied - r.retPointers[i] = nil - } - r.retPointers = r.retPointers[:0] + return auxBuf, nil } func (r *byteReader) next(n int) []byte { diff --git a/br/pkg/lightning/backend/external/byte_reader_test.go b/br/pkg/lightning/backend/external/byte_reader_test.go index 71d281ec13f3b..b5ef2457073a4 100644 --- a/br/pkg/lightning/backend/external/byte_reader_test.go +++ b/br/pkg/lightning/backend/external/byte_reader_test.go @@ -63,11 +63,6 @@ func (s *mockExtStore) GetFileSize() (int64, error) { } func TestByteReader(t *testing.T) { - testByteReaderNormal(t, false) - testByteReaderNormal(t, true) -} - -func testByteReaderNormal(t *testing.T, useConcurrency bool) { st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix") defer clean() @@ -96,9 +91,8 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) { // Test basic readNBytes() usage. br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) - y, err := br.readNBytes(2) + x, err = br.readNBytes(2) require.NoError(t, err) - x = *y require.Equal(t, 2, len(x)) require.Equal(t, byte('a'), x[0]) require.Equal(t, byte('b'), x[1]) @@ -106,9 +100,8 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) { br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) - y, err = br.readNBytes(5) // Read all the data. + x, err = br.readNBytes(5) // Read all the data. require.NoError(t, err) - x = *y require.Equal(t, 5, len(x)) require.Equal(t, byte('e'), x[4]) require.NoError(t, br.Close()) @@ -124,11 +117,10 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) { ms := &mockExtStore{src: []byte("abcdef")} br, err = newByteReader(context.Background(), ms, 2) require.NoError(t, err) - y, err = br.readNBytes(3) + x, err = br.readNBytes(3) require.NoError(t, err) // Pollute mockExtStore to verify if the slice is not affected. - copy(ms.src, []byte("xyz")) - x = *y + copy(ms.src, "xyz") require.Equal(t, 3, len(x)) require.Equal(t, byte('c'), x[2]) require.NoError(t, br.Close()) @@ -136,57 +128,12 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) { ms = &mockExtStore{src: []byte("abcdef")} br, err = newByteReader(context.Background(), ms, 2) require.NoError(t, err) - y, err = br.readNBytes(2) + x, err = br.readNBytes(2) require.NoError(t, err) // Pollute mockExtStore to verify if the slice is not affected. - copy(ms.src, []byte("xyz")) - x = *y + copy(ms.src, "xyz") require.Equal(t, 2, len(x)) require.Equal(t, byte('b'), x[1]) - br.reset() - require.NoError(t, br.Close()) -} - -func TestByteReaderClone(t *testing.T) { - ms := &mockExtStore{src: []byte("0123456789")} - br, err := newByteReader(context.Background(), ms, 4) - require.NoError(t, err) - y1, err := br.readNBytes(2) - require.NoError(t, err) - y2, err := br.readNBytes(1) - require.NoError(t, err) - x1, x2 := *y1, *y2 - require.Len(t, x1, 2) - require.Len(t, x2, 1) - require.Equal(t, byte('0'), x1[0]) - require.Equal(t, byte('2'), x2[0]) - require.NoError(t, br.reload()) // Perform a read to overwrite buffer. - x1, x2 = *y1, *y2 - require.Len(t, x1, 2) - require.Len(t, x2, 1) - require.Equal(t, byte('4'), x1[0]) // Verify if the buffer is overwritten. - require.Equal(t, byte('6'), x2[0]) - require.NoError(t, br.Close()) - - ms = &mockExtStore{src: []byte("0123456789")} - br, err = newByteReader(context.Background(), ms, 4) - require.NoError(t, err) - y1, err = br.readNBytes(2) - require.NoError(t, err) - y2, err = br.readNBytes(1) - require.NoError(t, err) - x1, x2 = *y1, *y2 - require.Len(t, x1, 2) - require.Len(t, x2, 1) - require.Equal(t, byte('0'), x1[0]) - require.Equal(t, byte('2'), x2[0]) - br.cloneSlices() - require.NoError(t, br.reload()) // Perform a read to overwrite buffer. - x1, x2 = *y1, *y2 - require.Len(t, x1, 2) - require.Len(t, x2, 1) - require.Equal(t, byte('0'), x1[0]) // Verify if the buffer is NOT overwritten. - require.Equal(t, byte('2'), x2[0]) require.NoError(t, br.Close()) } @@ -196,78 +143,17 @@ func TestByteReaderAuxBuf(t *testing.T) { require.NoError(t, err) y1, err := br.readNBytes(1) require.NoError(t, err) + require.Equal(t, []byte("0"), y1) y2, err := br.readNBytes(2) require.NoError(t, err) - require.Equal(t, []byte("0"), *y1) - require.Equal(t, []byte("12"), *y2) + require.Equal(t, []byte("12"), y2) y3, err := br.readNBytes(1) require.NoError(t, err) + require.Equal(t, []byte("3"), y3) y4, err := br.readNBytes(2) require.NoError(t, err) - require.Equal(t, []byte("3"), *y3) - require.Equal(t, []byte("45"), *y4) - require.Equal(t, []byte("0"), *y1) - require.Equal(t, []byte("12"), *y2) -} - -func TestReset(t *testing.T) { - testReset(t, false) - testReset(t, true) -} - -func testReset(t *testing.T, useConcurrency bool) { - st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix") - defer func() { - clean() - }() - - seed := time.Now().Unix() - rand.Seed(uint64(seed)) - t.Logf("seed: %d", seed) - src := make([]byte, 256) - for i := range src { - src[i] = byte(i) - } - // Prepare - err := st.WriteFile(context.Background(), "testfile", src) - require.NoError(t, err) - - newRsc := func() storage.ExternalFileReader { - rsc, err := st.Open(context.Background(), "testfile", nil) - require.NoError(t, err) - return rsc - } - bufSize := rand.Intn(256) - br, err := newByteReader(context.Background(), newRsc(), bufSize) - require.NoError(t, err) - end := 0 - toCheck := make([]*[]byte, 0, 10) - for end < len(src) { - n := rand.Intn(len(src) - end) - if n == 0 { - n = 1 - } - y, err := br.readNBytes(n) - require.NoError(t, err) - toCheck = append(toCheck, y) - end += n - - l := end - r := end - for i := len(toCheck) - 1; i >= 0; i-- { - l -= len(*toCheck[i]) - require.Equal(t, src[l:r], *toCheck[i]) - r = l - } - - if rand.Intn(2) == 0 { - br.reset() - toCheck = toCheck[:0] - } - } - _, err = br.readNBytes(1) - require.Equal(t, io.EOF, err) + require.Equal(t, []byte("45"), y4) } func TestUnexpectedEOF(t *testing.T) { @@ -366,7 +252,7 @@ func TestSwitchMode(t *testing.T) { break } require.NoError(t, err) - totalCnt += len(*y) + totalCnt += len(y) } require.Equal(t, fileSize, totalCnt) diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 4465534053917..1b60434287b47 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -143,10 +143,10 @@ func split[T any](in []T, groupNum int) [][]T { func (e *Engine) getAdjustedConcurrency() int { if e.checkHotspot { - // estimate we will open at most 1000 files, so if e.dataFiles is small we can + // estimate we will open at most 8000 files, so if e.dataFiles is small we can // try to concurrently process ranges. adjusted := maxCloudStorageConnections / len(e.dataFiles) - return min(adjusted, 8) + return min(adjusted, 16) } adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles)) return max(adjusted, 1) diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go index deb33d4b0fde7..156b6807897bb 100644 --- a/br/pkg/lightning/backend/external/engine_test.go +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) { workerConcurrency: 32, dataFiles: genFiles(100), } - require.Equal(t, 8, e.getAdjustedConcurrency()) + require.Equal(t, 16, e.getAdjustedConcurrency()) e.dataFiles = genFiles(8000) require.Equal(t, 1, e.getAdjustedConcurrency()) diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index fd66938a03a0a..a625248e61f31 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -50,25 +50,26 @@ func NewKeyValueStore( } // addEncodedData saves encoded key-value pairs to the KeyValueStore. -// data layout: keyLen + key + valueLen + value. If the accumulated +// data layout: keyLen + valueLen + key + value. If the accumulated // size or key count exceeds the given distance, a new range property will be // appended to the rangePropertiesCollector with current status. // `key` must be in strictly ascending order for invocations of a KeyValueStore. -func (s *KeyValueStore) addEncodedData(val []byte) error { - _, err := s.dataWriter.Write(s.ctx, val) +func (s *KeyValueStore) addEncodedData(data []byte) error { + _, err := s.dataWriter.Write(s.ctx, data) if err != nil { return err } - keyLen := binary.BigEndian.Uint64(val) - key := val[lengthBytes : lengthBytes+keyLen] + keyLen := binary.BigEndian.Uint64(data) + key := data[2*lengthBytes : 2*lengthBytes+keyLen] + if len(s.rc.currProp.firstKey) == 0 { s.rc.currProp.firstKey = key } s.rc.currProp.lastKey = key - s.offset += uint64(len(val)) - s.rc.currProp.size += uint64(len(val) - lengthBytes*2) + s.offset += uint64(len(data)) + s.rc.currProp.size += uint64(len(data) - 2*lengthBytes) s.rc.currProp.keys++ if s.rc.currProp.size >= s.rc.propSizeDist || diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go index 65ab999b17476..f58be194c0988 100644 --- a/br/pkg/lightning/backend/external/file_test.go +++ b/br/pkg/lightning/backend/external/file_test.go @@ -16,6 +16,7 @@ package external import ( "context" + "encoding/binary" "io" "testing" "time" @@ -25,6 +26,15 @@ import ( "golang.org/x/exp/rand" ) +func getEncodedData(key, value []byte) []byte { + buf := make([]byte, 8*2+len(key)+len(value)) + binary.BigEndian.PutUint64(buf, uint64(len(key))) + binary.BigEndian.PutUint64(buf[8:], uint64(len(value))) + copy(buf[8*2:], key) + copy(buf[8*2+len(key):], value) + return buf +} + func TestAddKeyValueMaintainRangeProperty(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() @@ -71,9 +81,9 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) { require.NoError(t, err) require.Len(t, rc.props, 1) + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) - kvStore.Close() expected = &rangeProperty{ firstKey: k3, lastKey: k3, @@ -152,6 +162,7 @@ func TestKVReadWrite(t *testing.T) { err = kvStore.addEncodedData(getEncodedData(keys[i], values[i])) require.NoError(t, err) } + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index 547d147132f1d..0847e7bad1f98 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -290,6 +290,9 @@ func (i *mergeIter[T, R]) next() bool { i.readers[i.lastReaderIdx] = nil delete(i.hotspotMap, i.lastReaderIdx) default: + i.logger.Error("failed to read next element", + zap.String("path", rd.path()), + zap.Error(err)) i.err = err return false } @@ -366,7 +369,7 @@ func NewMergeKVIter( readerOpeners := make([]readerOpenerFn[*kvPair, kvReaderProxy], 0, len(paths)) largeBufSize := ConcurrentReaderBufferSizePerConc * ConcurrentReaderConcurrency memPool := membuf.NewPool( - membuf.WithPoolSize(1), // currently only one reader will become hotspot + membuf.WithBlockNum(1), // currently only one reader will become hotspot membuf.WithBlockSize(largeBufSize), membuf.WithLargeAllocThreshold(largeBufSize), ) diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index e9b27f3793b6e..3a1db464ba827 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -60,15 +60,6 @@ func (r *trackOpenFileReader) Close() error { return nil } -func getEncodedData(key, value []byte) []byte { - buf := make([]byte, 8*2+len(key)+len(value)) - binary.BigEndian.PutUint64(buf, uint64(len(key))) - copy(buf[8:], key) - binary.BigEndian.PutUint64(buf[8+len(key):], uint64(len(value))) - copy(buf[8*2+len(key):], value) - return buf -} - func TestMergeKVIter(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() @@ -92,6 +83,7 @@ func TestMergeKVIter(t *testing.T) { err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) } @@ -144,6 +136,7 @@ func TestOneUpstream(t *testing.T) { err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) } @@ -222,6 +215,7 @@ func TestCorruptContent(t *testing.T) { err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1]))) require.NoError(t, err) } + kvStore.Close() if i == 0 { _, err = writer.Write(ctx, []byte("corrupt")) require.NoError(t, err) @@ -390,6 +384,7 @@ func TestHotspot(t *testing.T) { err = kvStore.addEncodedData(getEncodedData([]byte(k), value)) require.NoError(t, err) } + kvStore.Close() err = writer.Close(ctx) require.NoError(t, err) } diff --git a/br/pkg/lightning/backend/external/kv_reader.go b/br/pkg/lightning/backend/external/kv_reader.go index 3f295f3f4252c..def354b18f884 100644 --- a/br/pkg/lightning/backend/external/kv_reader.go +++ b/br/pkg/lightning/backend/external/kv_reader.go @@ -51,26 +51,21 @@ func newKVReader( } func (r *kvReader) nextKV() (key, val []byte, err error) { - r.byteReader.reset() lenBytes, err := r.byteReader.readNBytes(8) if err != nil { return nil, nil, err } - keyLen := int(binary.BigEndian.Uint64(*lenBytes)) - keyPtr, err := r.byteReader.readNBytes(keyLen) - if err != nil { - return nil, nil, noEOF(err) - } + keyLen := int(binary.BigEndian.Uint64(lenBytes)) lenBytes, err = r.byteReader.readNBytes(8) if err != nil { return nil, nil, noEOF(err) } - valLen := int(binary.BigEndian.Uint64(*lenBytes)) - valPtr, err := r.byteReader.readNBytes(valLen) + valLen := int(binary.BigEndian.Uint64(lenBytes)) + keyAndValue, err := r.byteReader.readNBytes(keyLen + valLen) if err != nil { return nil, nil, noEOF(err) } - return *keyPtr, *valPtr, nil + return keyAndValue[:keyLen], keyAndValue[keyLen:], nil } // noEOF converts the EOF error to io.ErrUnexpectedEOF. diff --git a/br/pkg/lightning/backend/external/onefile_writer.go b/br/pkg/lightning/backend/external/onefile_writer.go index b0dea86661744..996f336909fbc 100644 --- a/br/pkg/lightning/backend/external/onefile_writer.go +++ b/br/pkg/lightning/backend/external/onefile_writer.go @@ -106,10 +106,13 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err w.rc.reset() } binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) - copy(buf[lengthBytes:], idxKey) - binary.BigEndian.AppendUint64(buf[lengthBytes+keyLen:lengthBytes+keyLen], uint64(len(idxVal))) + binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal))) + copy(buf[lengthBytes*2:], idxKey) copy(buf[lengthBytes*2+keyLen:], idxVal) - w.kvStore.addEncodedData(buf[:length]) + err := w.kvStore.addEncodedData(buf[:length]) + if err != nil { + return err + } w.totalSize += uint64(keyLen + len(idxVal)) return nil } diff --git a/br/pkg/lightning/backend/external/split_test.go b/br/pkg/lightning/backend/external/split_test.go index a49f697b46116..738e0a021ec9f 100644 --- a/br/pkg/lightning/backend/external/split_test.go +++ b/br/pkg/lightning/backend/external/split_test.go @@ -102,13 +102,15 @@ func TestOnlyOneGroup(t *testing.T) { subDir := "/mock-test" writer := NewWriterBuilder(). - SetMemorySizeLimit(15). + SetMemorySizeLimit(20). SetPropSizeDistance(1). SetPropKeysDistance(1). Build(memStore, subDir, "5") dataFiles, statFiles, err := MockExternalEngineWithWriter(memStore, writer, subDir, [][]byte{{1}, {2}}, [][]byte{{1}, {2}}) require.NoError(t, err) + require.Len(t, dataFiles, 1) + require.Len(t, statFiles, 1) splitter, err := NewRangeSplitter( ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 10, true, diff --git a/br/pkg/lightning/backend/external/stat_reader.go b/br/pkg/lightning/backend/external/stat_reader.go index a921a7f7a098b..5aaea5988973e 100644 --- a/br/pkg/lightning/backend/external/stat_reader.go +++ b/br/pkg/lightning/backend/external/stat_reader.go @@ -40,17 +40,16 @@ func newStatsReader(ctx context.Context, store storage.ExternalStorage, name str } func (r *statsReader) nextProp() (*rangeProperty, error) { - r.byteReader.reset() lenBytes, err := r.byteReader.readNBytes(4) if err != nil { return nil, err } - propLen := int(binary.BigEndian.Uint32(*lenBytes)) + propLen := int(binary.BigEndian.Uint32(lenBytes)) propBytes, err := r.byteReader.readNBytes(propLen) if err != nil { return nil, noEOF(err) } - return decodeProp(*propBytes), nil + return decodeProp(propBytes), nil } func (r *statsReader) Close() error { diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 6556da432b784..6468e1413a8dd 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -179,7 +179,7 @@ func (b *WriterBuilder) Build( if b.keyDupeEncoding { keyAdapter = common.DupDetectKeyAdapter{} } - p := membuf.NewPool(membuf.WithPoolSize(0), membuf.WithBlockSize(b.blockSize)) + p := membuf.NewPool(membuf.WithBlockNum(0), membuf.WithBlockSize(b.blockSize)) ret := &Writer{ rc: &rangePropertiesCollector{ props: make([]*rangeProperty, 0, 1024), @@ -214,7 +214,7 @@ func (b *WriterBuilder) BuildOneFile( writerID string, ) *OneFileWriter { filenamePrefix := filepath.Join(prefix, writerID) - p := membuf.NewPool(membuf.WithPoolSize(0), membuf.WithBlockSize(b.blockSize)) + p := membuf.NewPool(membuf.WithBlockNum(0), membuf.WithBlockSize(b.blockSize)) ret := &OneFileWriter{ rc: &rangePropertiesCollector{ @@ -339,9 +339,9 @@ func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tid } } binary.BigEndian.AppendUint64(dataBuf[:0], uint64(encodedKeyLen)) - keyAdapter.Encode(dataBuf[lengthBytes:lengthBytes:lengthBytes+encodedKeyLen], idxKey, rowID) - binary.BigEndian.AppendUint64(dataBuf[lengthBytes+encodedKeyLen:lengthBytes+encodedKeyLen], uint64(len(idxVal))) - copy(dataBuf[lengthBytes*2+encodedKeyLen:], idxVal) + binary.BigEndian.AppendUint64(dataBuf[:lengthBytes], uint64(len(idxVal))) + keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], idxKey, rowID) + copy(dataBuf[2*lengthBytes+encodedKeyLen:], idxVal) w.kvLocations = append(w.kvLocations, loc) w.kvSize += int64(encodedKeyLen + len(idxVal)) @@ -377,7 +377,6 @@ func (w *Writer) Close(ctx context.Context) error { zap.String("maxKey", hex.EncodeToString(w.maxKey))) w.kvLocations = nil - w.onClose(&WriterSummary{ WriterID: w.writerID, Seq: w.currentSeq, @@ -513,7 +512,7 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { func (w *Writer) getKeyByLoc(loc membuf.SliceLocation) []byte { block := w.kvBuffer.GetSlice(loc) keyLen := binary.BigEndian.Uint64(block[:lengthBytes]) - return block[lengthBytes : lengthBytes+keyLen] + return block[2*lengthBytes : 2*lengthBytes+keyLen] } func (w *Writer) createStorageWriter(ctx context.Context) ( diff --git a/br/pkg/membuf/buffer.go b/br/pkg/membuf/buffer.go index 3a999d75e7404..58252f692e9d2 100644 --- a/br/pkg/membuf/buffer.go +++ b/br/pkg/membuf/buffer.go @@ -49,17 +49,17 @@ type Pool struct { // Option configures a pool. type Option func(p *Pool) -// WithPoolSize configures how many blocks cached by this pool. -func WithPoolSize(size int) Option { +// WithBlockNum configures how many blocks cached by this pool. +func WithBlockNum(num int) Option { return func(p *Pool) { - p.blockCache = make(chan []byte, size) + p.blockCache = make(chan []byte, num) } } // WithBlockSize configures the size of each block. -func WithBlockSize(size int) Option { +func WithBlockSize(bytes int) Option { return func(p *Pool) { - p.blockSize = size + p.blockSize = bytes } } diff --git a/br/pkg/membuf/buffer_test.go b/br/pkg/membuf/buffer_test.go index 44e8441066985..1051e43e11b0e 100644 --- a/br/pkg/membuf/buffer_test.go +++ b/br/pkg/membuf/buffer_test.go @@ -42,7 +42,7 @@ func (t *testAllocator) Free(_ []byte) { func TestBufferPool(t *testing.T) { allocator := &testAllocator{} pool := NewPool( - WithPoolSize(2), + WithBlockNum(2), WithAllocator(allocator), WithBlockSize(1024), WithLargeAllocThreshold(512),