Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: remove the unnecessary offset field in the key encoding for duplicate detection #29975

Merged
merged 9 commits into from
Dec 28, 2021
Next Next commit
lightning: remove the unnecessary offset field in the key encoding fo…
…r duplicate detection
  • Loading branch information
sleepymole committed Nov 25, 2021
commit d5713524d8ed712ec5246a1c0ca60b075e0e2e09
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ func (kvcodec *tableKVEncoder) Encode(
kvPairs := kvcodec.se.takeKvPairs()
for i := 0; i < len(kvPairs.pairs); i++ {
kvPairs.pairs[i].RowID = rowID
kvPairs.pairs[i].Offset = offset
}
kvcodec.recordCache = record[:0]
return kvPairs, nil
Expand Down
72 changes: 30 additions & 42 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0x2},
RowID: 2,
},
}})

Expand All @@ -136,10 +135,9 @@ func (s *kvSuite) TestEncode(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []uint8{0x8, 0x2, 0x8, 0xfe, 0x1},
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -262,8 +260,7 @@ func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
0x1, 0x0, // not null offsets = [1]
0x7f, // column version = 127 (10000000 clamped to TINYINT)
},
RowID: 1,
Offset: 1234,
RowID: 1,
},
}})
}
Expand Down Expand Up @@ -300,10 +297,9 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x8, 0x2, 0x9, 0x80, 0x80, 0x80, 0xf0, 0xfd, 0x8e, 0xf7, 0xc0, 0x19},
RowID: 70,
},
}})
}
Expand All @@ -328,16 +324,14 @@ func (s *kvSuite) TestEncodeDoubleAutoIncrement(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
RowID: 70,
},
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoIncrementType).Base(), Equals, int64(70))
Expand Down Expand Up @@ -372,10 +366,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 70,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(70))
Expand All @@ -384,10 +377,9 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
Offset: 1234,
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
RowID: 71,
},
}})
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(71))
Expand Down Expand Up @@ -424,24 +416,20 @@ func (s *kvSuite) TestShardRowId(c *C) {
func (s *kvSuite) TestSplitIntoChunks(c *C) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
Offset: 1000,
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
},
{
Key: []byte{7, 8},
Val: []byte{9, 0},
Offset: 2000,
Key: []byte{7, 8},
Val: []byte{9, 0},
},
{
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
Offset: 3000,
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
},
{
Key: []byte{9, 0},
Val: []byte{1, 2},
Offset: 4000,
Key: []byte{9, 0},
Val: []byte{1, 2},
},
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewDuplicateManager(local *local, ts uint64, opts *kv.SessionOptions) (*Dup
regionConcurrency: local.tcpConcurrency,
splitCli: local.splitCli,
tikvCli: local.tikvCli,
keyAdapter: duplicateKeyAdapter{},
keyAdapter: dupDetectKeyAdapter{},
ts: ts,
connPool: common.NewGRPCConns(),
// TODO: not sure what is the correct concurrency value.
Expand Down Expand Up @@ -495,7 +495,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
return err
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(

for iter.First(); iter.Valid(); iter.Next() {
hasDataConflict = true
rawKey, _, _, err := manager.keyAdapter.Decode(nil, iter.Key())
rawKey, err := manager.keyAdapter.Decode(nil, iter.Key())
if err != nil {
indexLogger.Error(
"[detect-dupe] decode key error when query handle for duplicate index",
Expand Down
54 changes: 28 additions & 26 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (e *Engine) getSizeProperties() (*sizeProperties, error) {
newRangeProps := make(rangeProperties, 0, len(rangeProps))
for _, p := range rangeProps {
if !bytes.Equal(p.Key, engineMetaKey) {
p.Key, _, _, err = e.keyAdapter.Decode(nil, p.Key)
p.Key, err = e.keyAdapter.Decode(nil, p.Key)
if err != nil {
log.L().Warn(
"decodeRangeProperties failed because the props key is invalid",
Expand Down Expand Up @@ -995,8 +995,6 @@ type Writer struct {

batchCount int
batchSize int64
totalSize int64
totalCount int64

lastMetaSeq int32
}
Expand All @@ -1008,25 +1006,29 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
return errors.Trace(err)
}
w.writer = writer
w.writer.minKey = append([]byte{}, kvs[0].Key...)
}

totalKeyLen := 0
for i := 0; i < len(kvs); i++ {
totalKeyLen += w.engine.keyAdapter.EncodedLen(kvs[i].Key)
}
buf := make([]byte, totalKeyLen)
encodedKvs := make([]common.KvPair, len(kvs))
keyAdapter := w.engine.keyAdapter
totalKeySize := 0
for i := 0; i < len(kvs); i++ {
encodedKey := w.engine.keyAdapter.Encode(buf, kvs[i].Key, kvs[i].RowID, kvs[i].Offset)
buf = buf[len(encodedKey):]
encodedKvs[i] = common.KvPair{Key: encodedKey, Val: kvs[i].Val}
w.batchSize += int64(len(encodedKvs[i].Key) + len(encodedKvs[i].Val))
keySize := keyAdapter.EncodedLen(kvs[i].Key)
w.batchSize += int64(keySize + len(kvs[i].Val))
totalKeySize += keySize
}
w.batchCount += len(kvs)
// noopKeyAdapter doesn't really change the key,
// skipping the encoding to avoid unnecessary alloc and copy.
if _, ok := keyAdapter.(noopKeyAdapter); !ok {
buf := make([]byte, 0, totalKeySize)
newKvs := make([]common.KvPair, len(kvs))
for i := 0; i < len(kvs); i++ {
buf = keyAdapter.Encode(buf[:0], kvs[i].Key, kvs[i].RowID)
newKvs[i] = common.KvPair{Key: buf, Val: kvs[i].Val}
buf = buf[len(buf):]
}
kvs = newKvs
}

w.batchCount += len(encodedKvs)
w.totalCount += int64(len(encodedKvs))
return w.writer.writeKVs(encodedKvs)
return w.writer.writeKVs(kvs)
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand All @@ -1036,14 +1038,15 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
if cnt > 0 {
lastKey = w.writeBatch[cnt-1].Key
}
keyAdapter := w.engine.keyAdapter
for _, pair := range kvs {
if w.isWriteBatchSorted && bytes.Compare(lastKey, pair.Key) > 0 {
w.isWriteBatchSorted = false
}
lastKey = pair.Key
w.batchSize += int64(len(pair.Key) + len(pair.Val))
buf := w.kvBuffer.AllocBytes(w.engine.keyAdapter.EncodedLen(pair.Key))
key := w.engine.keyAdapter.Encode(buf, pair.Key, pair.RowID, pair.Offset)
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
w.writeBatch[cnt].Key = key
Expand All @@ -1060,7 +1063,6 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
return err
}
}
w.totalCount += int64(len(kvs))
return nil
}

Expand Down Expand Up @@ -1099,7 +1101,6 @@ func (w *Writer) flush(ctx context.Context) error {
return nil
}

w.totalSize += w.batchSize
if len(w.writeBatch) > 0 {
if err := w.flushKVs(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1157,7 +1158,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
w.isWriteBatchSorted = true
}

writer.minKey = append(writer.minKey[:0], w.writeBatch[0].Key...)
err = writer.writeKVs(w.writeBatch[:w.batchCount])
if err != nil {
return errors.Trace(err)
Expand All @@ -1171,7 +1171,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
return errors.Trace(err)
}

w.totalSize += w.batchSize
w.batchSize = 0
w.batchCount = 0
w.kvBuffer.Reset()
Expand Down Expand Up @@ -1222,7 +1221,9 @@ func (sw *sstWriter) writeKVs(kvs []common.KvPair) error {
if len(kvs) == 0 {
return nil
}

if len(sw.minKey) == 0 {
sw.minKey = append([]byte{}, kvs[0].Key...)
}
if bytes.Compare(kvs[0].Key, sw.maxKey) <= 0 {
return errorUnorderedSSTInsertion
}
Expand All @@ -1241,9 +1242,10 @@ func (sw *sstWriter) writeKVs(kvs []common.KvPair) error {
return errors.Trace(err)
}
sw.totalSize += int64(len(p.Key)) + int64(len(p.Val))
lastKey = p.Key
}
sw.totalCount += int64(len(kvs))
sw.maxKey = append(sw.maxKey[:0], kvs[len(kvs)-1].Key...)
sw.maxKey = append(sw.maxKey[:0], lastKey...)
return nil
}

Expand Down
Loading