Skip to content

Commit

Permalink
*: optimize the layout of global sort files and other (#48275)
Browse files Browse the repository at this point in the history
ref #48779
  • Loading branch information
wjhuang2016 authored Nov 24, 2023
1 parent 8eb1913 commit 1263445
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 206 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 49,
shard_count = 47,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func createEvenlyDistributedFiles(
kvCnt := 0
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetBlockSize(10 * 1024 * 1024).
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
Expand Down
42 changes: 12 additions & 30 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ import (
"context"
"io"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
)

var (
// ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per
// concurrency.
ConcurrentReaderBufferSizePerConc = 4 * 1024 * 1024
ConcurrentReaderBufferSizePerConc = int(4 * size.MB)
// ConcurrentReaderConcurrency is the concurrency for concurrent reader.
ConcurrentReaderConcurrency = 8
)
Expand All @@ -44,8 +46,6 @@ type byteReader struct {
curBufOffset int
smallBuf []byte

retPointers []*[]byte

concurrentReader struct {
largeBufferPool *membuf.Buffer
store storage.ExternalStorage
Expand Down Expand Up @@ -193,24 +193,23 @@ func (r *byteReader) switchToConcurrentReader() error {
return nil
}

// readNBytes reads the next n bytes from the reader and returns a buffer slice containing those bytes.
// The returned slice (pointer) can not be used after r.reset. In the same interval of r.reset,
// byteReader guarantees that the returned slice (pointer) will point to the same content
// though the slice may be changed.
func (r *byteReader) readNBytes(n int) (*[]byte, error) {
// readNBytes reads the next n bytes from the reader and returns a buffer slice
// containing those bytes. The content of returned slice may be changed after
// next call.
func (r *byteReader) readNBytes(n int) ([]byte, error) {
b := r.next(n)
readLen := len(b)
if readLen == n {
ret := &b
r.retPointers = append(r.retPointers, ret)
return ret, nil
return b, nil
}
// If the reader has fewer than n bytes remaining in current buffer,
// `auxBuf` is used as a container instead.
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
auxBuf := make([]byte, n)
copy(auxBuf, b)
for readLen < n {
r.cloneSlices()
err := r.reload()
switch err {
case nil:
Expand All @@ -226,24 +225,7 @@ func (r *byteReader) readNBytes(n int) (*[]byte, error) {
copy(auxBuf[readLen:], b)
readLen += len(b)
}
return &auxBuf, nil
}

func (r *byteReader) reset() {
for i := range r.retPointers {
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
}

func (r *byteReader) cloneSlices() {
for i := range r.retPointers {
copied := make([]byte, len(*r.retPointers[i]))
copy(copied, *r.retPointers[i])
*r.retPointers[i] = copied
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
return auxBuf, nil
}

func (r *byteReader) next(n int) []byte {
Expand Down
136 changes: 11 additions & 125 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ func (s *mockExtStore) GetFileSize() (int64, error) {
}

func TestByteReader(t *testing.T) {
testByteReaderNormal(t, false)
testByteReaderNormal(t, true)
}

func testByteReaderNormal(t *testing.T, useConcurrency bool) {
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
defer clean()

Expand Down Expand Up @@ -96,19 +91,17 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) {
// Test basic readNBytes() usage.
br, err = newByteReader(context.Background(), newRsc(), 3)
require.NoError(t, err)
y, err := br.readNBytes(2)
x, err = br.readNBytes(2)
require.NoError(t, err)
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('a'), x[0])
require.Equal(t, byte('b'), x[1])
require.NoError(t, br.Close())

br, err = newByteReader(context.Background(), newRsc(), 3)
require.NoError(t, err)
y, err = br.readNBytes(5) // Read all the data.
x, err = br.readNBytes(5) // Read all the data.
require.NoError(t, err)
x = *y
require.Equal(t, 5, len(x))
require.Equal(t, byte('e'), x[4])
require.NoError(t, br.Close())
Expand All @@ -124,69 +117,23 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) {
ms := &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(3)
x, err = br.readNBytes(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
copy(ms.src, []byte("xyz"))
x = *y
copy(ms.src, "xyz")
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(2)
x, err = br.readNBytes(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
copy(ms.src, []byte("xyz"))
x = *y
copy(ms.src, "xyz")
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
br.reset()
require.NoError(t, br.Close())
}

func TestByteReaderClone(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br, err := newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err := br.readNBytes(2)
require.NoError(t, err)
y2, err := br.readNBytes(1)
require.NoError(t, err)
x1, x2 := *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.reload()) // Perform a read to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('4'), x1[0]) // Verify if the buffer is overwritten.
require.Equal(t, byte('6'), x2[0])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("0123456789")}
br, err = newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err = br.readNBytes(2)
require.NoError(t, err)
y2, err = br.readNBytes(1)
require.NoError(t, err)
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
br.cloneSlices()
require.NoError(t, br.reload()) // Perform a read to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0]) // Verify if the buffer is NOT overwritten.
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.Close())
}

Expand All @@ -196,78 +143,17 @@ func TestByteReaderAuxBuf(t *testing.T) {
require.NoError(t, err)
y1, err := br.readNBytes(1)
require.NoError(t, err)
require.Equal(t, []byte("0"), y1)
y2, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)
require.Equal(t, []byte("12"), y2)

y3, err := br.readNBytes(1)
require.NoError(t, err)
require.Equal(t, []byte("3"), y3)
y4, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("3"), *y3)
require.Equal(t, []byte("45"), *y4)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)
}

func TestReset(t *testing.T) {
testReset(t, false)
testReset(t, true)
}

func testReset(t *testing.T, useConcurrency bool) {
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
defer func() {
clean()
}()

seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
src := make([]byte, 256)
for i := range src {
src[i] = byte(i)
}
// Prepare
err := st.WriteFile(context.Background(), "testfile", src)
require.NoError(t, err)

newRsc := func() storage.ExternalFileReader {
rsc, err := st.Open(context.Background(), "testfile", nil)
require.NoError(t, err)
return rsc
}
bufSize := rand.Intn(256)
br, err := newByteReader(context.Background(), newRsc(), bufSize)
require.NoError(t, err)
end := 0
toCheck := make([]*[]byte, 0, 10)
for end < len(src) {
n := rand.Intn(len(src) - end)
if n == 0 {
n = 1
}
y, err := br.readNBytes(n)
require.NoError(t, err)
toCheck = append(toCheck, y)
end += n

l := end
r := end
for i := len(toCheck) - 1; i >= 0; i-- {
l -= len(*toCheck[i])
require.Equal(t, src[l:r], *toCheck[i])
r = l
}

if rand.Intn(2) == 0 {
br.reset()
toCheck = toCheck[:0]
}
}
_, err = br.readNBytes(1)
require.Equal(t, io.EOF, err)
require.Equal(t, []byte("45"), y4)
}

func TestUnexpectedEOF(t *testing.T) {
Expand Down Expand Up @@ -366,7 +252,7 @@ func TestSwitchMode(t *testing.T) {
break
}
require.NoError(t, err)
totalCnt += len(*y)
totalCnt += len(y)
}
require.Equal(t, fileSize, totalCnt)

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func split[T any](in []T, groupNum int) [][]T {

func (e *Engine) getAdjustedConcurrency() int {
if e.checkHotspot {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// estimate we will open at most 8000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
adjusted := maxCloudStorageConnections / len(e.dataFiles)
return min(adjusted, 8)
return min(adjusted, 16)
}
adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles))
return max(adjusted, 1)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) {
workerConcurrency: 32,
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
require.Equal(t, 16, e.getAdjustedConcurrency())
e.dataFiles = genFiles(8000)
require.Equal(t, 1, e.getAdjustedConcurrency())

Expand Down
15 changes: 8 additions & 7 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,26 @@ func NewKeyValueStore(
}

// addEncodedData saves encoded key-value pairs to the KeyValueStore.
// data layout: keyLen + key + valueLen + value. If the accumulated
// data layout: keyLen + valueLen + key + value. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) addEncodedData(val []byte) error {
_, err := s.dataWriter.Write(s.ctx, val)
func (s *KeyValueStore) addEncodedData(data []byte) error {
_, err := s.dataWriter.Write(s.ctx, data)
if err != nil {
return err
}

keyLen := binary.BigEndian.Uint64(val)
key := val[lengthBytes : lengthBytes+keyLen]
keyLen := binary.BigEndian.Uint64(data)
key := data[2*lengthBytes : 2*lengthBytes+keyLen]

if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key

s.offset += uint64(len(val))
s.rc.currProp.size += uint64(len(val) - lengthBytes*2)
s.offset += uint64(len(data))
s.rc.currProp.size += uint64(len(data) - 2*lengthBytes)
s.rc.currProp.keys++

if s.rc.currProp.size >= s.rc.propSizeDist ||
Expand Down
Loading

0 comments on commit 1263445

Please sign in to comment.