Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: remove the unnecessary offset field in the key encoding for duplicate detection #29975

Merged
merged 9 commits into from
Dec 28, 2021
Prev Previous commit
Next Next commit
add dupDBIter
  • Loading branch information
sleepymole committed Dec 24, 2021
commit 5ffef1ebe447556f5da7b4437863373ad9658575
17 changes: 17 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
pkgkv "github.com/pingcap/tidb/br/pkg/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -965,6 +966,22 @@ func (e *Engine) unfinishedRanges(ranges []Range) []Range {
return filterOverlapRange(ranges, e.finishedRanges.ranges)
}

func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) pkgkv.Iter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
newOpts.LowerBound = normalIterStartKey
opts = &newOpts
}
if !e.duplicateDetection {
return pebbleIter{Iterator: e.db.NewIter(opts)}
}
logger := log.With(
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
}

type sstMeta struct {
path string
minKey []byte
Expand Down
136 changes: 99 additions & 37 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ package local
import (
"bytes"
"context"
"math"

"github.com/cockroachdb/pebble"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/multierr"
)

type pebbleIter struct {
Expand Down Expand Up @@ -55,16 +52,15 @@ type dupDetectIter struct {
nextKey []byte
err error

engine *Engine
keyAdapter KeyAdapter
writeBatch *pebble.Batch
writeBatchSize int64
logger log.Logger
}

func (d *dupDetectIter) Seek(key []byte) bool {
encodedKey := d.keyAdapter.Encode(nil, key, 0)
if d.err != nil || !d.iter.SeekGE(encodedKey) {
rawKey := d.keyAdapter.Encode(nil, key, 0)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.fill()
Expand Down Expand Up @@ -99,13 +95,16 @@ func (d *dupDetectIter) flush() {
d.writeBatchSize = 0
}

func (d *dupDetectIter) record(key []byte, val []byte) {
d.engine.Duplicates.Inc()
d.err = d.writeBatch.Set(key, val, nil)
func (d *dupDetectIter) record(rawKey, key, val []byte) {
d.logger.Debug("[detect-dupe] local duplicate key detected",
logutil.Key("key", key),
logutil.Key("value", val),
logutil.Key("rawKey", rawKey))
d.err = d.writeBatch.Set(rawKey, val, nil)
if d.err != nil {
return
}
d.writeBatchSize += int64(len(key) + len(val))
d.writeBatchSize += int64(len(rawKey) + len(val))
if d.writeBatchSize >= maxDuplicateBatchSize {
d.flush()
}
Expand All @@ -124,15 +123,11 @@ func (d *dupDetectIter) Next() bool {
d.curVal = append(d.curVal[:0], d.iter.Value()...)
return true
}
d.logger.Debug("[detect-dupe] local duplicate key detected",
logutil.Key("key", d.curKey),
logutil.Key("prevValue", d.curVal),
logutil.Key("value", d.iter.Value()))
if !recordFirst {
d.record(d.curRawKey, d.curVal)
d.record(d.curRawKey, d.curKey, d.curVal)
recordFirst = true
}
d.record(d.iter.Key(), d.iter.Value())
d.record(d.iter.Key(), d.nextKey, d.iter.Value())
}
if d.err == nil {
d.err = d.ctx.Err()
Expand Down Expand Up @@ -170,36 +165,103 @@ func (d *dupDetectIter) OpType() sst.Pair_OP {

var _ kv.Iter = &dupDetectIter{}

func newDupDetectIter(ctx context.Context, engine *Engine, opts *pebble.IterOptions) kv.Iter {
func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger) *dupDetectIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = codec.EncodeBytes(nil, opts.LowerBound)
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = codec.EncodeBytes(nil, opts.UpperBound)
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
}
logger := log.With(
zap.String("table", common.UniqueTable(engine.tableInfo.DB, engine.tableInfo.Name)),
zap.Int64("tableID", engine.tableInfo.ID),
zap.Stringer("engineUUID", engine.UUID))
return &dupDetectIter{
ctx: ctx,
iter: engine.db.NewIter(newOpts),
engine: engine,
keyAdapter: engine.keyAdapter,
writeBatch: engine.duplicateDB.NewBatch(),
iter: db.NewIter(newOpts),
keyAdapter: keyAdapter,
writeBatch: dupDB.NewBatch(),
logger: logger,
}
}

func newKVIter(ctx context.Context, engine *Engine, opts *pebble.IterOptions) kv.Iter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
newOpts.LowerBound = normalIterStartKey
opts = &newOpts
type dupDBIter struct {
iter *pebble.Iterator
keyAdapter KeyAdapter
curKey []byte
err error
}

func (d *dupDBIter) Seek(key []byte) bool {
rawKey := d.keyAdapter.Encode(nil, key, 0)
if d.err != nil || !d.iter.SeekGE(rawKey) {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Error() error {
if d.err != nil {
return d.err
}
return d.iter.Error()
}

func (d *dupDBIter) First() bool {
if d.err != nil || !d.iter.First() {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Last() bool {
if d.err != nil || !d.iter.Last() {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Valid() bool {
return d.err == nil && d.iter.Valid()
}

func (d *dupDBIter) Next() bool {
if d.err != nil || !d.iter.Next() {
return false
}
d.curKey, d.err = d.keyAdapter.Decode(d.curKey[:0], d.iter.Key())
return d.err == nil
}

func (d *dupDBIter) Key() []byte {
return d.curKey
}

func (d *dupDBIter) Value() []byte {
return d.iter.Value()
}

func (d *dupDBIter) Close() error {
return d.iter.Close()
}

func (d *dupDBIter) OpType() sst.Pair_OP {
return sst.Pair_Put
}

var _ kv.Iter = &dupDBIter{}

func newDupDBIter(dupDB *pebble.DB, keyAdapter KeyAdapter, opts *pebble.IterOptions) *dupDBIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
}
if len(opts.UpperBound) > 0 {
newOpts.UpperBound = keyAdapter.Encode(nil, opts.UpperBound, math.MinInt64)
}
if !engine.duplicateDetection {
return pebbleIter{Iterator: engine.db.NewIter(opts)}
return &dupDBIter{
iter: dupDB.NewIter(newOpts),
keyAdapter: keyAdapter,
}
return newDupDetectIter(ctx, engine, opts)
}
69 changes: 23 additions & 46 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"bytes"
"context"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,7 +84,7 @@ func TestDupDetectIterator(t *testing.T) {
}

// Find duplicates from the generated pairs.
var duplicatePairs []common.KvPair
var dupPairs []common.KvPair
sort.Slice(pairs, func(i, j int) bool {
return bytes.Compare(pairs[i].Key, pairs[j].Key) < 0
})
Expand All @@ -100,7 +100,7 @@ func TestDupDetectIterator(t *testing.T) {
continue
}
for k := i; k < j; k++ {
duplicatePairs = append(duplicatePairs, pairs[k])
dupPairs = append(dupPairs, pairs[k])
}
i = j
}
Expand All @@ -112,9 +112,7 @@ func TestDupDetectIterator(t *testing.T) {
rnd.Shuffle(len(pairs), func(i, j int) {
pairs[i], pairs[j] = pairs[j], pairs[i]
})
storeDir, err := os.MkdirTemp("", "lightning-test")
require.NoError(t, err)
defer os.RemoveAll(storeDir)
storeDir := t.TempDir()
db, err := pebble.Open(filepath.Join(storeDir, "kv"), &pebble.Options{})
require.NoError(t, err)
wb := db.NewBatch()
Expand All @@ -124,19 +122,10 @@ func TestDupDetectIterator(t *testing.T) {
}
require.NoError(t, wb.Commit(pebble.Sync))

duplicateDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
engine := &Engine{
ctx: context.Background(),
db: db,
keyAdapter: keyAdapter,
duplicateDB: duplicateDB,
tableInfo: &checkpoints.TidbTableInfo{
DB: "db",
Name: "name",
},
}
iter := newDupDetectIter(context.Background(), engine, &pebble.IterOptions{})
var iter kv.Iter
iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())
sort.Slice(pairs, func(i, j int) bool {
key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID)
key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID)
Expand All @@ -163,35 +152,33 @@ func TestDupDetectIterator(t *testing.T) {
require.NoError(t, iter.Error())
require.Equal(t, 0, len(uniqueKeys))
require.NoError(t, iter.Close())
require.NoError(t, engine.Close())
require.NoError(t, db.Close())

// Check duplicates detected by duplicate iterator.
iter = pebbleIter{Iterator: duplicateDB.NewIter(&pebble.IterOptions{})}
// Check duplicates detected by dupDetectIter.
iter = newDupDBIter(dupDB, keyAdapter, &pebble.IterOptions{})
var detectedPairs []common.KvPair
for iter.First(); iter.Valid(); iter.Next() {
key, err := keyAdapter.Decode(nil, iter.Key())
require.NoError(t, err)
detectedPairs = append(detectedPairs, common.KvPair{
Key: key,
Key: append([]byte{}, iter.Key()...),
Val: append([]byte{}, iter.Value()...),
})
}
require.NoError(t, iter.Error())
require.NoError(t, iter.Close())
require.NoError(t, duplicateDB.Close())
require.Equal(t, len(duplicatePairs), len(detectedPairs))
require.NoError(t, dupDB.Close())
require.Equal(t, len(dupPairs), len(detectedPairs))

sort.Slice(duplicatePairs, func(i, j int) bool {
keyCmp := bytes.Compare(duplicatePairs[i].Key, duplicatePairs[j].Key)
return keyCmp < 0 || keyCmp == 0 && bytes.Compare(duplicatePairs[i].Val, duplicatePairs[j].Val) < 0
sort.Slice(dupPairs, func(i, j int) bool {
keyCmp := bytes.Compare(dupPairs[i].Key, dupPairs[j].Key)
return keyCmp < 0 || keyCmp == 0 && bytes.Compare(dupPairs[i].Val, dupPairs[j].Val) < 0
})
sort.Slice(detectedPairs, func(i, j int) bool {
keyCmp := bytes.Compare(detectedPairs[i].Key, detectedPairs[j].Key)
return keyCmp < 0 || keyCmp == 0 && bytes.Compare(detectedPairs[i].Val, detectedPairs[j].Val) < 0
})
for i := 0; i < len(detectedPairs); i++ {
require.Equal(t, duplicatePairs[i].Key, detectedPairs[i].Key)
require.Equal(t, duplicatePairs[i].Val, detectedPairs[i].Val)
require.Equal(t, dupPairs[i].Key, detectedPairs[i].Key)
require.Equal(t, dupPairs[i].Val, detectedPairs[i].Val)
}
}

Expand Down Expand Up @@ -233,25 +220,15 @@ func TestDupDetectIterSeek(t *testing.T) {
}
require.NoError(t, wb.Commit(pebble.Sync))

duplicateDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
engine := &Engine{
ctx: context.Background(),
db: db,
keyAdapter: keyAdapter,
duplicateDB: duplicateDB,
tableInfo: &checkpoints.TidbTableInfo{
DB: "db",
Name: "name",
},
}
iter := newDupDetectIter(context.Background(), engine, &pebble.IterOptions{})
iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())

require.True(t, iter.Seek([]byte{1, 2, 3, 1}))
require.Equal(t, pairs[1].Val, iter.Value())
require.True(t, iter.Next())
require.Equal(t, pairs[3].Val, iter.Value())
require.NoError(t, iter.Close())
require.NoError(t, engine.Close())
require.NoError(t, duplicateDB.Close())
require.NoError(t, db.Close())
require.NoError(t, dupDB.Close())
}
Loading