Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize the layout of global sort files and other #48275

Merged
merged 64 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
a2e73b8
try
wjhuang2016 Nov 3, 2023
964adab
try
wjhuang2016 Nov 3, 2023
09c267e
fix
wjhuang2016 Nov 3, 2023
0ffe2e0
try
wjhuang2016 Nov 6, 2023
c4074b3
try
wjhuang2016 Nov 6, 2023
280c20f
try
wjhuang2016 Nov 6, 2023
fffd17c
try
wjhuang2016 Nov 6, 2023
74a817c
try
wjhuang2016 Nov 6, 2023
19c4e1e
fix
wjhuang2016 Nov 6, 2023
619385d
try
wjhuang2016 Nov 6, 2023
3450098
try
wjhuang2016 Nov 6, 2023
1bdde22
fix
wjhuang2016 Nov 6, 2023
11752f2
try
wjhuang2016 Nov 6, 2023
6649072
fix
wjhuang2016 Nov 6, 2023
262d459
try
wjhuang2016 Nov 6, 2023
454a032
try
wjhuang2016 Nov 6, 2023
8f4cfd4
try
wjhuang2016 Nov 7, 2023
260022a
refine
wjhuang2016 Nov 7, 2023
d1307df
fix
wjhuang2016 Nov 7, 2023
da1d580
try
wjhuang2016 Nov 7, 2023
72b4896
try
wjhuang2016 Nov 7, 2023
6d16e4a
try
wjhuang2016 Nov 7, 2023
839cef4
try
wjhuang2016 Nov 8, 2023
ac1a039
refine
wjhuang2016 Nov 8, 2023
68d3c74
try
wjhuang2016 Nov 8, 2023
292fc71
try
wjhuang2016 Nov 8, 2023
75936fd
done
wjhuang2016 Nov 8, 2023
73e8390
Revert "done"
wjhuang2016 Nov 9, 2023
ffdbffc
Revert "try"
wjhuang2016 Nov 9, 2023
f3b402c
refine
wjhuang2016 Nov 9, 2023
9cdef6a
refine
wjhuang2016 Nov 9, 2023
9209252
revert
wjhuang2016 Nov 9, 2023
17a5e5f
fix ut
wjhuang2016 Nov 9, 2023
2ac3305
fix UT
wjhuang2016 Nov 9, 2023
0627afd
fix test
lance6716 Nov 9, 2023
f50e8e1
refine
wjhuang2016 Nov 10, 2023
53bb454
Merge branch 'test_master' of github.com:wjhuang2016/tidb into test_m…
wjhuang2016 Nov 10, 2023
e29528f
Merge branch 'master' of github.com:pingcap/tidb into test_master
lance6716 Nov 14, 2023
e29ccad
modify some test files
lance6716 Nov 14, 2023
85efa4e
rename
lance6716 Nov 14, 2023
bcbccee
Merge branch 'master' of github.com:pingcap/tidb into test_master
lance6716 Nov 14, 2023
c0eef91
remove more UT
lance6716 Nov 14, 2023
8439c90
fix CI
lance6716 Nov 14, 2023
bf6f356
fix bench size
lance6716 Nov 14, 2023
0404938
membuf: add SliceLocation to reduce GC pressure
lance6716 Nov 20, 2023
b5c11ae
fix wrong estimation
lance6716 Nov 20, 2023
07ff503
delete kvLocation
lance6716 Nov 20, 2023
b47101a
fix bazel
lance6716 Nov 20, 2023
0c307ac
fix CI
lance6716 Nov 20, 2023
8ef52b1
fix ut
lance6716 Nov 20, 2023
e9c9fc3
fix UT
lance6716 Nov 20, 2023
56365c9
fix UT
lance6716 Nov 20, 2023
b07c24c
fix UT
lance6716 Nov 20, 2023
e7fd45b
Merge branch 'store-loc' into test_master
lance6716 Nov 21, 2023
4263795
fix test
lance6716 Nov 21, 2023
c08e24d
Merge branch 'master' of github.com:pingcap/tidb into test_master
lance6716 Nov 22, 2023
d61393d
fix comment
lance6716 Nov 22, 2023
cccdeeb
remove membuf
lance6716 Nov 22, 2023
8ce124e
fix CI
lance6716 Nov 22, 2023
5375d41
revert some change
lance6716 Nov 22, 2023
56063a3
Merge branch 'master' of github.com:pingcap/tidb into test_master
lance6716 Nov 23, 2023
00d28ad
fix build
lance6716 Nov 23, 2023
6e59d90
address comment
lance6716 Nov 23, 2023
56337f1
address comment
lance6716 Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 45,
shard_count = 43,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func createEvenlyDistributedFiles(
kvCnt := 0
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetBlockSize(10 * 1024 * 1024).
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
Expand Down
39 changes: 10 additions & 29 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -44,8 +45,6 @@ type byteReader struct {
curBufOffset int
smallBuf []byte

retPointers []*[]byte

concurrentReader struct {
largeBufferPool *membuf.Buffer
store storage.ExternalStorage
Expand Down Expand Up @@ -192,24 +191,23 @@ func (r *byteReader) switchToConcurrentReader() error {
return nil
}

// readNBytes reads the next n bytes from the reader and returns a buffer slice containing those bytes.
// The returned slice (pointer) can not be used after r.reset. In the same interval of r.reset,
// byteReader guarantees that the returned slice (pointer) will point to the same content
// though the slice may be changed.
func (r *byteReader) readNBytes(n int) (*[]byte, error) {
// readNBytes reads the next n bytes from the reader and returns a buffer slice
// containing those bytes. The content of returned slice may be changed after
// next call.
func (r *byteReader) readNBytes(n int) ([]byte, error) {
b := r.next(n)
readLen := len(b)
if readLen == n {
ret := &b
r.retPointers = append(r.retPointers, ret)
return ret, nil
return b, nil
}
// If the reader has fewer than n bytes remaining in current buffer,
// `auxBuf` is used as a container instead.
if n > 1024*1024*1024 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it 1024*1024*1024?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a defensive coding, and the author of this line is @wjhuang2016 let's ask him

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,if there is something wrong we can return an error instead of panic. Just choose a large number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to do this outside of readNBytes() and log the file name & other info.

Could you change this to 1 * size.GB instead and add comment for it?

return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, 1024*1024*1024)
}
auxBuf := make([]byte, n)
copy(auxBuf, b)
for readLen < n {
r.cloneSlices()
err := r.reload()
switch err {
case nil:
Expand All @@ -225,24 +223,7 @@ func (r *byteReader) readNBytes(n int) (*[]byte, error) {
copy(auxBuf[readLen:], b)
readLen += len(b)
}
return &auxBuf, nil
}

func (r *byteReader) reset() {
for i := range r.retPointers {
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
}

func (r *byteReader) cloneSlices() {
for i := range r.retPointers {
copied := make([]byte, len(*r.retPointers[i]))
copy(copied, *r.retPointers[i])
*r.retPointers[i] = copied
r.retPointers[i] = nil
}
r.retPointers = r.retPointers[:0]
return auxBuf, nil
}

func (r *byteReader) next(n int) []byte {
Expand Down
136 changes: 11 additions & 125 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ func (s *mockExtStore) GetFileSize() (int64, error) {
}

func TestByteReader(t *testing.T) {
testByteReaderNormal(t, false)
testByteReaderNormal(t, true)
}

func testByteReaderNormal(t *testing.T, useConcurrency bool) {
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
defer clean()

Expand Down Expand Up @@ -96,19 +91,17 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) {
// Test basic readNBytes() usage.
br, err = newByteReader(context.Background(), newRsc(), 3)
require.NoError(t, err)
y, err := br.readNBytes(2)
x, err = br.readNBytes(2)
require.NoError(t, err)
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('a'), x[0])
require.Equal(t, byte('b'), x[1])
require.NoError(t, br.Close())

br, err = newByteReader(context.Background(), newRsc(), 3)
require.NoError(t, err)
y, err = br.readNBytes(5) // Read all the data.
x, err = br.readNBytes(5) // Read all the data.
require.NoError(t, err)
x = *y
require.Equal(t, 5, len(x))
require.Equal(t, byte('e'), x[4])
require.NoError(t, br.Close())
Expand All @@ -124,69 +117,23 @@ func testByteReaderNormal(t *testing.T, useConcurrency bool) {
ms := &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(3)
x, err = br.readNBytes(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
copy(ms.src, []byte("xyz"))
x = *y
copy(ms.src, "xyz")
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("abcdef")}
br, err = newByteReader(context.Background(), ms, 2)
require.NoError(t, err)
y, err = br.readNBytes(2)
x, err = br.readNBytes(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
copy(ms.src, []byte("xyz"))
x = *y
copy(ms.src, "xyz")
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
br.reset()
require.NoError(t, br.Close())
}

func TestByteReaderClone(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br, err := newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err := br.readNBytes(2)
require.NoError(t, err)
y2, err := br.readNBytes(1)
require.NoError(t, err)
x1, x2 := *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.reload()) // Perform a read to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('4'), x1[0]) // Verify if the buffer is overwritten.
require.Equal(t, byte('6'), x2[0])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("0123456789")}
br, err = newByteReader(context.Background(), ms, 4)
require.NoError(t, err)
y1, err = br.readNBytes(2)
require.NoError(t, err)
y2, err = br.readNBytes(1)
require.NoError(t, err)
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
br.cloneSlices()
require.NoError(t, br.reload()) // Perform a read to overwrite buffer.
x1, x2 = *y1, *y2
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0]) // Verify if the buffer is NOT overwritten.
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.Close())
}

Expand All @@ -196,78 +143,17 @@ func TestByteReaderAuxBuf(t *testing.T) {
require.NoError(t, err)
y1, err := br.readNBytes(1)
require.NoError(t, err)
require.Equal(t, []byte("0"), y1)
y2, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)
require.Equal(t, []byte("12"), y2)

y3, err := br.readNBytes(1)
require.NoError(t, err)
require.Equal(t, []byte("3"), y3)
y4, err := br.readNBytes(2)
require.NoError(t, err)
require.Equal(t, []byte("3"), *y3)
require.Equal(t, []byte("45"), *y4)
require.Equal(t, []byte("0"), *y1)
require.Equal(t, []byte("12"), *y2)
}

func TestReset(t *testing.T) {
testReset(t, false)
testReset(t, true)
}

func testReset(t *testing.T, useConcurrency bool) {
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
defer func() {
clean()
}()

seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
src := make([]byte, 256)
for i := range src {
src[i] = byte(i)
}
// Prepare
err := st.WriteFile(context.Background(), "testfile", src)
require.NoError(t, err)

newRsc := func() storage.ExternalFileReader {
rsc, err := st.Open(context.Background(), "testfile", nil)
require.NoError(t, err)
return rsc
}
bufSize := rand.Intn(256)
br, err := newByteReader(context.Background(), newRsc(), bufSize)
require.NoError(t, err)
end := 0
toCheck := make([]*[]byte, 0, 10)
for end < len(src) {
n := rand.Intn(len(src) - end)
if n == 0 {
n = 1
}
y, err := br.readNBytes(n)
require.NoError(t, err)
toCheck = append(toCheck, y)
end += n

l := end
r := end
for i := len(toCheck) - 1; i >= 0; i-- {
l -= len(*toCheck[i])
require.Equal(t, src[l:r], *toCheck[i])
r = l
}

if rand.Intn(2) == 0 {
br.reset()
toCheck = toCheck[:0]
}
}
_, err = br.readNBytes(1)
require.Equal(t, io.EOF, err)
require.Equal(t, []byte("45"), y4)
}

func TestUnexpectedEOF(t *testing.T) {
Expand Down Expand Up @@ -366,7 +252,7 @@ func TestSwitchMode(t *testing.T) {
break
}
require.NoError(t, err)
totalCnt += len(*y)
totalCnt += len(y)
}
require.Equal(t, fileSize, totalCnt)

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func split[T any](in []T, groupNum int) [][]T {

func (e *Engine) getAdjustedConcurrency() int {
if e.checkHotspot {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// estimate we will open at most 8000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
adjusted := maxCloudStorageConnections / len(e.dataFiles)
return min(adjusted, 8)
return min(adjusted, 16)
}
adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles))
return max(adjusted, 1)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) {
workerConcurrency: 32,
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
require.Equal(t, 16, e.getAdjustedConcurrency())
e.dataFiles = genFiles(8000)
require.Equal(t, 1, e.getAdjustedConcurrency())

Expand Down
15 changes: 8 additions & 7 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,26 @@ func NewKeyValueStore(
}

// addEncodedData saves encoded key-value pairs to the KeyValueStore.
// data layout: keyLen + key + valueLen + value. If the accumulated
// data layout: keyLen + valueLen + key + value. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) addEncodedData(val []byte) error {
_, err := s.dataWriter.Write(s.ctx, val)
func (s *KeyValueStore) addEncodedData(data []byte) error {
_, err := s.dataWriter.Write(s.ctx, data)
if err != nil {
return err
}

keyLen := binary.BigEndian.Uint64(val)
key := val[lengthBytes : lengthBytes+keyLen]
keyLen := binary.BigEndian.Uint64(data)
key := data[2*lengthBytes : 2*lengthBytes+keyLen]

if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key

s.offset += uint64(len(val))
s.rc.currProp.size += uint64(len(val) - lengthBytes*2)
s.offset += uint64(len(data))
s.rc.currProp.size += uint64(len(data) - 2*lengthBytes)
s.rc.currProp.keys++

if s.rc.currProp.size >= s.rc.propSizeDist ||
Expand Down
13 changes: 12 additions & 1 deletion br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package external

import (
"context"
"encoding/binary"
"io"
"testing"
"time"
Expand All @@ -25,6 +26,15 @@ import (
"golang.org/x/exp/rand"
)

func getEncodedData(key, value []byte) []byte {
buf := make([]byte, 8*2+len(key)+len(value))
binary.BigEndian.PutUint64(buf, uint64(len(key)))
binary.BigEndian.PutUint64(buf[8:], uint64(len(value)))
copy(buf[8*2:], key)
copy(buf[8*2+len(key):], value)
return buf
}

func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
Expand Down Expand Up @@ -71,9 +81,9 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)

kvStore.Close()
err = writer.Close(ctx)
require.NoError(t, err)
kvStore.Close()
expected = &rangeProperty{
firstKey: k3,
lastKey: k3,
Expand Down Expand Up @@ -152,6 +162,7 @@ func TestKVReadWrite(t *testing.T) {
err = kvStore.addEncodedData(getEncodedData(keys[i], values[i]))
require.NoError(t, err)
}
kvStore.Close()
err = writer.Close(ctx)
require.NoError(t, err)

Expand Down
Loading