Skip to content

Commit

Permalink
2pc: reduce memory footprint by using a more compact mutations list (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Zejun Li authored Oct 16, 2020
1 parent a891b4c commit 62190f3
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 773 deletions.
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type MemBufferIterator interface {
HasValue() bool
Flags() KeyFlags
UpdateFlags(...FlagsOp)
Handle() MemKeyHandle
}

// MemBuffer is an in-memory kv collection, can be used to buffer write operations.
Expand All @@ -195,6 +196,9 @@ type MemBuffer interface {
// UpdateFlags update the flags associated with key.
UpdateFlags(Key, ...FlagsOp)

GetKeyByHandle(MemKeyHandle) []byte
GetValueByHandle(MemKeyHandle) ([]byte, bool)

// Reset reset the MemBuffer to initial states.
Reset()
// DiscardValues releases the memory used by all values.
Expand Down
28 changes: 28 additions & 0 deletions kv/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ var tombstone = []byte{}
// IsTombstone returns whether the value is a tombstone.
func IsTombstone(val []byte) bool { return len(val) == 0 }

// MemKeyHandle represents a pointer for key in MemBuffer.
type MemKeyHandle struct {
// Opaque user data
UserData uint16
idx uint16
off uint32
}

func (h MemKeyHandle) toAddr() memdbArenaAddr {
return memdbArenaAddr{idx: uint32(h.idx), off: h.off}
}

// memdb is rollbackable Red-Black Tree optimized for TiDB's transaction states buffer use scenario.
// You can think memdb is a combination of two separate tree map, one for key => value and another for key => keyFlags.
//
Expand Down Expand Up @@ -299,6 +311,22 @@ func (db *memdb) Delete(key Key) error {
return db.set(key, tombstone)
}

func (db *memdb) GetKeyByHandle(handle MemKeyHandle) []byte {
x := db.getNode(handle.toAddr())
return x.getKey()
}

func (db *memdb) GetValueByHandle(handle MemKeyHandle) ([]byte, bool) {
if db.vlogInvalid {
return nil, false
}
x := db.getNode(handle.toAddr())
if x.vptr.isNull() {
return nil, false
}
return db.vlog.getValue(x.vptr), true
}

func (db *memdb) Len() int {
return db.count
}
Expand Down
7 changes: 7 additions & 0 deletions kv/memdb_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (i *memdbIterator) Key() Key {
return i.curr.getKey()
}

func (i *memdbIterator) Handle() MemKeyHandle {
return MemKeyHandle{
idx: uint16(i.curr.addr.idx),
off: i.curr.addr.off,
}
}

func (i *memdbIterator) Value() []byte {
return i.db.vlog.getValue(i.curr.vptr)
}
Expand Down
38 changes: 22 additions & 16 deletions session/schema_amender.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func isInsertOp(keyOp pb.Op) bool {
// amendOp is an amend operation for a specific schema change, new mutations will be generated using input ones.
type amendOp interface {
genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations tikv.CommitterMutations, kvMap *rowKvMap,
resultMutations *tikv.CommitterMutations) error
resultMutations *tikv.PlainMutations) error
}

// amendOperationAddIndex represents one amend operation related to a specific add index change.
Expand Down Expand Up @@ -335,9 +335,11 @@ func (a *amendOperationAddIndexInfo) String() string {
}

func (a *amendOperationDeleteOldIndex) genMutations(ctx context.Context, sctx sessionctx.Context,
commitMutations tikv.CommitterMutations, kvMap *rowKvMap, resAddMutations *tikv.CommitterMutations) error {
for i, key := range commitMutations.GetKeys() {
keyOp := commitMutations.GetOps()[i]
commitMutations tikv.CommitterMutations, kvMap *rowKvMap, resAddMutations *tikv.PlainMutations) error {
count := commitMutations.Len()
for i := 0; i < count; i++ {
key := commitMutations.GetKey(i)
keyOp := commitMutations.GetOp(i)
if tablecodec.IsIndexKey(key) || tablecodec.DecodeTableID(key) != a.info.tblInfoAtCommit.Meta().ID {
continue
}
Expand All @@ -353,9 +355,11 @@ func (a *amendOperationDeleteOldIndex) genMutations(ctx context.Context, sctx se
}

func (a *amendOperationAddNewIndex) genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations tikv.CommitterMutations,
kvMap *rowKvMap, resAddMutations *tikv.CommitterMutations) error {
for i, key := range commitMutations.GetKeys() {
keyOp := commitMutations.GetOps()[i]
kvMap *rowKvMap, resAddMutations *tikv.PlainMutations) error {
count := commitMutations.Len()
for i := 0; i < count; i++ {
key := commitMutations.GetKey(i)
keyOp := commitMutations.GetOp(i)
if tablecodec.IsIndexKey(key) || tablecodec.DecodeTableID(key) != a.info.tblInfoAtCommit.Meta().ID {
continue
}
Expand Down Expand Up @@ -415,7 +419,7 @@ func (a *amendOperationAddIndexInfo) genIndexKeyValue(ctx context.Context, sctx
}

func (a *amendOperationAddNewIndex) processRowKey(ctx context.Context, sctx sessionctx.Context, key []byte,
kvMap map[string][]byte, resAddMutations *tikv.CommitterMutations) error {
kvMap map[string][]byte, resAddMutations *tikv.PlainMutations) error {
kvHandle, err := tablecodec.DecodeRowKey(key)
if err != nil {
logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err))
Expand All @@ -431,7 +435,7 @@ func (a *amendOperationAddNewIndex) processRowKey(ctx context.Context, sctx sess
}

func (a *amendOperationDeleteOldIndex) processRowKey(ctx context.Context, sctx sessionctx.Context, key []byte,
oldValKvMap map[string][]byte, resAddMutations *tikv.CommitterMutations) error {
oldValKvMap map[string][]byte, resAddMutations *tikv.PlainMutations) error {
kvHandle, err := tablecodec.DecodeRowKey(key)
if err != nil {
logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err))
Expand Down Expand Up @@ -461,13 +465,15 @@ func NewSchemaAmenderForTikvTxn(sess *session) *SchemaAmender {
}

func (s *SchemaAmender) getAmendableKeys(commitMutations tikv.CommitterMutations, info *amendCollector) ([]kv.Key, []kv.Key) {
addKeys := make([]kv.Key, 0, len(commitMutations.GetKeys()))
removeKeys := make([]kv.Key, 0, len(commitMutations.GetKeys()))
for i, byteKey := range commitMutations.GetKeys() {
count := commitMutations.Len()
addKeys := make([]kv.Key, 0, count)
removeKeys := make([]kv.Key, 0, count)
for i := 0; i < count; i++ {
byteKey := commitMutations.GetKey(i)
if tablecodec.IsIndexKey(byteKey) || !info.keyHasAmendOp(byteKey) {
continue
}
keyOp := commitMutations.GetOps()[i]
keyOp := commitMutations.GetOp(i)
if pb.Op_Put == keyOp {
addKeys = append(addKeys, byteKey)
removeKeys = append(removeKeys, byteKey)
Expand Down Expand Up @@ -522,13 +528,13 @@ func (s *SchemaAmender) prepareKvMap(ctx context.Context, commitMutations tikv.C

// genAllAmendMutations generates CommitterMutations for all tables and related amend operations.
func (s *SchemaAmender) genAllAmendMutations(ctx context.Context, commitMutations tikv.CommitterMutations,
info *amendCollector) (*tikv.CommitterMutations, error) {
info *amendCollector) (*tikv.PlainMutations, error) {
rowKvMap, err := s.prepareKvMap(ctx, commitMutations, info)
if err != nil {
return nil, err
}
// Do generate add/remove mutations processing each key.
resultNewMutations := tikv.NewCommiterMutations(32)
resultNewMutations := tikv.NewPlainMutations(32)
for _, amendOps := range info.tblAmendOpMap {
for _, curOp := range amendOps {
err := curOp.genMutations(ctx, s.sess, commitMutations, rowKvMap, &resultNewMutations)
Expand All @@ -543,7 +549,7 @@ func (s *SchemaAmender) genAllAmendMutations(ctx context.Context, commitMutation
// AmendTxn does check and generate amend mutations based on input infoSchema and mutations, mutations need to prewrite
// are returned, the input commitMutations will not be changed.
func (s *SchemaAmender) AmendTxn(ctx context.Context, startInfoSchema tikv.SchemaVer, change *tikv.RelatedSchemaChange,
commitMutations tikv.CommitterMutations) (*tikv.CommitterMutations, error) {
commitMutations tikv.CommitterMutations) (tikv.CommitterMutations, error) {
// Get info schema meta
infoSchemaAtStart := startInfoSchema.(infoschema.InfoSchema)
infoSchemaAtCheck := change.LatestInfoSchema.(infoschema.InfoSchema)
Expand Down
26 changes: 11 additions & 15 deletions session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func initTblColIdxID(metaInfo *model.TableInfo) {
metaInfo.State = model.StatePublic
}

func mutationsEqual(res *tikv.CommitterMutations, expected *tikv.CommitterMutations, c *C) {
func mutationsEqual(res tikv.CommitterMutations, expected tikv.CommitterMutations, c *C) {
c.Assert(len(res.GetKeys()), Equals, len(expected.GetKeys()))
for i := 0; i < len(res.GetKeys()); i++ {
foundIdx := -1
Expand All @@ -69,10 +69,10 @@ func mutationsEqual(res *tikv.CommitterMutations, expected *tikv.CommitterMutati
}
}
c.Assert(foundIdx, GreaterEqual, 0)
c.Assert(res.GetOps()[i], Equals, expected.GetOps()[foundIdx])
c.Assert(res.GetPessimisticFlags()[i], Equals, expected.GetPessimisticFlags()[foundIdx])
c.Assert(res.GetOp(i), Equals, expected.GetOp(foundIdx))
c.Assert(res.IsPessimisticLock(i), Equals, expected.IsPessimisticLock(foundIdx))
c.Assert(res.GetKeys()[i], BytesEquals, expected.GetKeys()[foundIdx])
c.Assert(res.GetValues()[i], BytesEquals, expected.GetValues()[foundIdx])
c.Assert(res.GetValue(i), BytesEquals, expected.GetValue(foundIdx))
}
}

Expand All @@ -83,8 +83,8 @@ type data struct {
rowValue [][]types.Datum
}

func prepareTestData(se *session, mutations *tikv.CommitterMutations, oldTblInfo table.Table, newTblInfo table.Table,
expecetedAmendOps []amendOp, c *C) (*data, *data, tikv.CommitterMutations) {
func prepareTestData(se *session, mutations *tikv.PlainMutations, oldTblInfo table.Table, newTblInfo table.Table,
expecetedAmendOps []amendOp, c *C) (*data, *data, *tikv.PlainMutations) {
var err error
// Generated test data.
colIds := make([]int64, len(oldTblInfo.Meta().Columns))
Expand All @@ -104,7 +104,7 @@ func prepareTestData(se *session, mutations *tikv.CommitterMutations, oldTblInfo
rd := rowcodec.Encoder{Enable: true}
newData := &data{}
oldData := &data{}
expecteMutations := tikv.NewCommiterMutations(8)
expecteMutations := tikv.NewPlainMutations(8)

// Generate old data.
for i := 0; i < len(KeyOps); i++ {
Expand Down Expand Up @@ -220,7 +220,7 @@ func prepareTestData(se *session, mutations *tikv.CommitterMutations, oldTblInfo
}
}
}
return newData, oldData, expecteMutations
return newData, oldData, &expecteMutations
}

func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
}
}
// Generated test data.
mutations := tikv.NewCommiterMutations(8)
mutations := tikv.NewPlainMutations(8)
newData, oldData, expectedMutations := prepareTestData(se, &mutations, oldTbInfo, newTblInfo, expectedAmendOps, c)
// Prepare old data in table.
txnPrepare, err := se.store.Begin()
Expand Down Expand Up @@ -405,14 +405,10 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
mutations.Push(kvrpcpb.Op_Put, idxKey, idxValue, false)
}

res, err := schemaAmender.genAllAmendMutations(ctx, mutations, collector)
res, err := schemaAmender.genAllAmendMutations(ctx, &mutations, collector)
c.Assert(err, IsNil)

// Validate generated results.
c.Assert(len(res.GetKeys()), Equals, len(res.GetOps()))
c.Assert(len(res.GetValues()), Equals, len(res.GetOps()))
c.Assert(len(res.GetPessimisticFlags()), Equals, len(res.GetOps()))
mutationsEqual(res, &expectedMutations, c)
mutationsEqual(res, expectedMutations, c)
err = txn.Rollback()
c.Assert(err, IsNil)
}
Expand Down
Loading

0 comments on commit 62190f3

Please sign in to comment.