Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into change-mock
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Aug 25, 2023
2 parents 1de1f53 + 9b7c1e2 commit 0114b1a
Show file tree
Hide file tree
Showing 85 changed files with 1,140 additions and 880 deletions.
25 changes: 19 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5780,13 +5780,13 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sha256 = "74b710db49c16c0400c44d14e7d2e23257d094d7cf39c71e5715bb5ac59fd57a",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20230802082933-256c189cd860",
sha256 = "d6e086c68505edca6fba1a6264dadbab29abc85722d520355c485da901dfdb41",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20230822064221-711da6fede03",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230802082933-256c189cd860.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230802082933-256c189cd860.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230802082933-256c189cd860.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230802082933-256c189cd860.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230822064221-711da6fede03.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230822064221-711da6fede03.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230822064221-711da6fede03.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20230822064221-711da6fede03.zip",
],
)
go_repository(
Expand Down Expand Up @@ -10382,6 +10382,19 @@ def go_deps():
"https://storage.googleapis.com/pingcapmirror/gomod/go.uber.org/goleak/org_uber_go_goleak-v1.2.1.zip",
],
)
go_repository(
name = "org_uber_go_mock",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/mock",
sha256 = "df840a589119d0c1966e3f8888fb6b6a05b4aa793b1074c3fd4c4a508e0b0e3a",
strip_prefix = "go.uber.org/mock@v0.2.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/go.uber.org/mock/org_uber_go_mock-v0.2.0.zip",
"http://ats.apps.svc/gomod/go.uber.org/mock/org_uber_go_mock-v0.2.0.zip",
"https://cache.hawkingrei.com/gomod/go.uber.org/mock/org_uber_go_mock-v0.2.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/go.uber.org/mock/org_uber_go_mock-v0.2.0.zip",
],
)
go_repository(
name = "org_uber_go_multierr",
build_file_proto_mode = "disable_global",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ go_test(
"//br/pkg/mock",
"//parser/mysql",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_mock//gomock",
],
)
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

gmysql "github.com/go-sql-driver/mysql"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/mock/gomock"
)

type backendSuite struct {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
"//util/codec",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_pingcap_errors//:errors",
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (r *byteReader) reload() error {
case io.ErrUnexpectedEOF:
// The last batch.
r.buf = r.buf[:nBytes]
break
default:
logutil.Logger(r.ctx).Warn("other error during reload", zap.Error(err))
return err
Expand Down
8 changes: 2 additions & 6 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ func TestByteReader(t *testing.T) {
y, err = br.readNBytes(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
copy(ms.src, []byte("xyz"))
x = *y
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
Expand All @@ -107,9 +105,7 @@ func TestByteReader(t *testing.T) {
y, err = br.readNBytes(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
copy(ms.src, []byte("xyz"))
x = *y
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestIter(t *testing.T) {
SetMemorySizeLimit(uint64(rand.Intn(100)+1)).
SetPropSizeDistance(uint64(rand.Intn(50)+1)).
SetPropKeysDistance(uint64(rand.Intn(10)+1)).
Build(store, i, "/subtask")
Build(store, "/subtask", i)
kvStart := i * 100
kvEnd := (i + 1) * 100
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd]))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestCorruptContent(t *testing.T) {
}
if i == 0 {
_, err = writer.Write(ctx, []byte("corrupt"))
require.NoError(t, err)
}
err = writer.Close(ctx)
require.NoError(t, err)
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestSeekPropsOffsets(t *testing.T) {
got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, []uint64{30, 20}, got)
got, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store)
_, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []byte("key1"), []string{file1, file2}, store)
require.NoError(t, err)
Expand All @@ -105,6 +105,7 @@ func TestSeekPropsOffsets(t *testing.T) {
w3, err := store.Create(ctx, file3, nil)
require.NoError(t, err)
err = w3.Close(ctx)
require.NoError(t, err)

file4 := "/test4"
w4, err := store.Create(ctx, file4, nil)
Expand All @@ -125,7 +126,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 0, "/subtask")
Build(store, "/subtask", 0)
kvPairs := make([]common.KvPair, 0, 30)
for i := 0; i < 30; i++ {
kvPairs = append(kvPairs, common.KvPair{
Expand All @@ -142,7 +143,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 3, "/subtask")
Build(store, "/subtask", 3)
err = w2.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w2.Close(ctx)
Expand All @@ -152,7 +153,7 @@ func TestGetAllFileNames(t *testing.T) {
SetMemorySizeLimit(20).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, 12, "/subtask")
Build(store, "/subtask", 12)
err = w3.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs))
require.NoError(t, err)
_, err = w3.Close(ctx)
Expand Down
32 changes: 25 additions & 7 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -74,11 +75,12 @@ func DummyOnCloseFunc(*WriterSummary) {}

// WriterBuilder builds a new Writer.
type WriterBuilder struct {
memSizeLimit uint64
writeBatchCount uint64
propSizeDist uint64
propKeysDist uint64
onClose OnCloseFunc
memSizeLimit uint64
writeBatchCount uint64
propSizeDist uint64
propKeysDist uint64
onClose OnCloseFunc
dupeDetectEnabled bool

bufferPool *membuf.Pool
}
Expand Down Expand Up @@ -132,18 +134,28 @@ func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder {
return b
}

// EnableDuplicationDetection enables the duplication detection of the writer.
func (b *WriterBuilder) EnableDuplicationDetection() *WriterBuilder {
b.dupeDetectEnabled = true
return b
}

// Build builds a new Writer. The files writer will create are under the prefix
// of "{prefix}/{writerID}".
func (b *WriterBuilder) Build(
store storage.ExternalStorage,
writerID int,
prefix string,
writerID int,
) *Writer {
bp := b.bufferPool
if bp == nil {
bp = membuf.NewPool()
}
filenamePrefix := filepath.Join(prefix, strconv.Itoa(writerID))
keyAdapter := local.KeyAdapter(local.NoopKeyAdapter{})
if b.dupeDetectEnabled {
keyAdapter = local.DupDetectKeyAdapter{}
}
return &Writer{
rc: &rangePropertiesCollector{
props: make([]*rangeProperty, 0, 1024),
Expand All @@ -157,6 +169,7 @@ func (b *WriterBuilder) Build(
writeBatch: make([]common.KvPair, 0, b.writeBatchCount),
currentSeq: 0,
filenamePrefix: filenamePrefix,
keyAdapter: keyAdapter,
writerID: writerID,
kvStore: nil,
onClose: b.onClose,
Expand All @@ -170,6 +183,7 @@ type Writer struct {
writerID int
currentSeq int
filenamePrefix string
keyAdapter local.KeyAdapter

kvStore *KeyValueStore
rc *rangePropertiesCollector
Expand All @@ -195,10 +209,14 @@ type Writer struct {
// Note that this method is NOT thread-safe.
func (w *Writer) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
keyAdapter := w.keyAdapter
for _, pair := range kvs {
w.batchSize += uint64(len(pair.Key) + len(pair.Val))
key := w.kvBuffer.AddBytes(pair.Key)

buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)

w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
if w.batchSize >= w.memSizeLimit {
if err := w.flushKVs(ctx); err != nil {
Expand Down
77 changes: 75 additions & 2 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import (
"context"
"fmt"
"io"
"path"
"slices"
"strings"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
Expand All @@ -41,7 +45,7 @@ func TestWriter(t *testing.T) {
writer := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
Build(memStore, 0, "/test")
Build(memStore, "/test", 0)

kvCnt := rand.Intn(10) + 10
kvs := make([]common.KvPair, kvCnt)
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestWriterFlushMultiFileNames(t *testing.T) {
writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(60).
Build(memStore, 0, "/test")
Build(memStore, "/test", 0)

// 200 bytes key values.
kvCnt := 10
Expand Down Expand Up @@ -140,3 +144,72 @@ func TestWriterFlushMultiFileNames(t *testing.T) {
require.Equal(t, statFiles[i], fmt.Sprintf("/test/0_stat/%d", i))
}
}

func TestWriterDuplicateDetect(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()

writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(1000).
EnableDuplicationDetection().
Build(memStore, "/test", 0)
kvCount := 20
kvs := make([]common.KvPair, 0, kvCount)
for i := 0; i < kvCount; i++ {
v := i
if v == kvCount/2 {
v-- // insert a duplicate key.
}
kvs = append(kvs, common.KvPair{
Key: []byte{byte(v)},
Val: []byte{byte(v)},
RowID: dbkv.IntHandle(i).Encoded(),
})
}
rows := kv.MakeRowsFromKvPairs(kvs)
err := writer.AppendRows(ctx, nil, rows)
require.NoError(t, err)
_, err = writer.Close(ctx)
require.NoError(t, err)

keys := make([][]byte, 0, kvCount)
values := make([][]byte, 0, kvCount)

kvReader, err := newKVReader(ctx, "/test/0/0", memStore, 0, 100)
require.NoError(t, err)
for i := 0; i < kvCount; i++ {
key, value, err := kvReader.nextKV()
require.NoError(t, err)
require.Equal(t, kvs[i].Val, value)
clonedKey := make([]byte, len(key))
copy(clonedKey, key)
clonedVal := make([]byte, len(value))
copy(clonedVal, value)
keys = append(keys, clonedKey)
values = append(values, clonedVal)
}
_, _, err = kvReader.nextKV()
require.Equal(t, io.EOF, err)

dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "duplicate"), nil)
require.NoError(t, err)
keyAdapter := local.DupDetectKeyAdapter{}
data := &MemoryIngestData{
keyAdapter: keyAdapter,
duplicateDetection: true,
duplicateDB: db,
dupDetectOpt: local.DupDetectOpt{ReportErrOnDup: true},
keys: keys,
values: values,
ts: 123,
}
iter := data.NewIter(ctx, nil, nil)

for iter.First(); iter.Valid(); iter.Next() {
}
err = iter.Error()
require.Error(t, err)
require.Contains(t, err.Error(), "found duplicate key")
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ go_test(
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -170,5 +169,6 @@ go_test(
"@org_golang_google_grpc//encoding",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_mock//gomock",
],
)
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/disk_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ package local
import (
"testing"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/mock/mocklocal"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestCheckDiskQuota(t *testing.T) {
Expand Down
Loading

0 comments on commit 0114b1a

Please sign in to comment.