Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: set memdb's size limits after create #24215

Merged
merged 7 commits into from
Apr 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package txn

import (
"context"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand All @@ -33,6 +34,11 @@ type tikvTxn struct {
// NewTiKVTxn returns a new Transaction.
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{})

entryLimit := atomic.LoadUint64(&kv.TxnEntrySizeLimit)
totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
txn.GetUnionStore().SetEntrySizeLimit(entryLimit, totalLimit)

return &tikvTxn{txn, make(map[int64]*model.TableInfo)}
}

Expand Down
4 changes: 0 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -397,9 +396,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
}
c.txnSize = size

if size > int(tidbkv.TxnTotalSizeLimit) {
return tidbkv.ErrTxnTooLarge.GenWithStackByArgs(size)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks we don't need it because it is checked whenever memdb is updated.

const logEntryCount = 10000
const logSize = 4 * 1024 * 1024 // 4MB
if c.mutations.Len() > logEntryCount || size > logSize {
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ package unionstore

import (
"bytes"
"math"
"reflect"
"sync"
"sync/atomic"
"unsafe"

tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -72,8 +72,8 @@ func newMemDB() *MemDB {
db.allocator.init()
db.root = nullAddr
db.stages = make([]memdbCheckpoint, 0, 2)
db.entrySizeLimit = atomic.LoadUint64(&tidbkv.TxnEntrySizeLimit)
db.bufferSizeLimit = atomic.LoadUint64(&tidbkv.TxnTotalSizeLimit)
db.entrySizeLimit = math.MaxUint64
db.bufferSizeLimit = math.MaxUint64
return db
}

Expand Down
18 changes: 5 additions & 13 deletions store/tikv/unionstore/memdb_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package unionstore

import (
"encoding/binary"
"math"
"math/rand"
"testing"
)
Expand All @@ -25,19 +24,12 @@ const (
valueSize = 128
)

func newMemDBForBench() *MemDB {
db := newMemDB()
db.bufferSizeLimit = math.MaxUint64
db.entrySizeLimit = math.MaxUint64
return db
}

func BenchmarkLargeIndex(b *testing.B) {
buf := make([][valueSize]byte, 10000000)
for i := range buf {
binary.LittleEndian.PutUint32(buf[i][:], uint32(i))
}
db := newMemDBForBench()
db := newMemDB()
b.ResetTimer()

for i := range buf {
Expand All @@ -51,7 +43,7 @@ func BenchmarkPut(b *testing.B) {
binary.BigEndian.PutUint32(buf[i][:], uint32(i))
}

p := newMemDBForBench()
p := newMemDB()
b.ResetTimer()

for i := range buf {
Expand All @@ -65,7 +57,7 @@ func BenchmarkPutRandom(b *testing.B) {
binary.LittleEndian.PutUint32(buf[i][:], uint32(rand.Int()))
}

p := newMemDBForBench()
p := newMemDB()
b.ResetTimer()

for i := range buf {
Expand All @@ -79,7 +71,7 @@ func BenchmarkGet(b *testing.B) {
binary.BigEndian.PutUint32(buf[i][:], uint32(i))
}

p := newMemDBForBench()
p := newMemDB()
for i := range buf {
p.Set(buf[i][:keySize], buf[i][:])
}
Expand All @@ -96,7 +88,7 @@ func BenchmarkGetRandom(b *testing.B) {
binary.LittleEndian.PutUint32(buf[i][:], uint32(rand.Int()))
}

p := newMemDBForBench()
p := newMemDB()
for i := range buf {
p.Set(buf[i][:keySize], buf[i][:])
}
Expand Down
3 changes: 0 additions & 3 deletions store/tikv/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"encoding/binary"
"fmt"
"math"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -71,8 +70,6 @@ func (s *testMemDBSuite) TestGetSet(c *C) {

func (s *testMemDBSuite) TestBigKV(c *C) {
db := newMemDB()
db.entrySizeLimit = math.MaxUint64
db.bufferSizeLimit = math.MaxUint64
db.Set([]byte{1}, make([]byte, 80<<20))
c.Assert(db.vlog.blockSize, Equals, maxBlockSize)
c.Assert(len(db.vlog.blocks), Equals, 1)
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/unionstore/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func (us *KVUnionStore) GetOption(opt int) interface{} {
return us.opts[opt]
}

// SetEntrySizeLimit sets the size limit for each entry and total buffer.
func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should attention that it is ok for tikv-client to update the entrysizelimit in a transaction.

us.memBuffer.entrySizeLimit = entryLimit
us.memBuffer.bufferSizeLimit = bufferLimit
}

type options map[int]interface{}

func (opts options) Get(opt int) (interface{}, bool) {
Expand Down