From c9000abdc216b6a02efbcc578af8be1f98ba280d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 23 Oct 2019 16:50:59 +0800 Subject: [PATCH] store,kv: snapshot doesn't cache the non-exists kv entries lead to poor 'insert ignore' performance (#12872) (#12898) --- kv/kv.go | 2 ++ store/mockstore/mocktikv/rpc.go | 7 +++++++ store/tikv/snapshot.go | 17 ++++++++++++++--- store/tikv/snapshot_test.go | 21 +++++++++++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index f8f86f953ffff..09d903581edf2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -156,6 +156,8 @@ type Transaction interface { // SetAssertion sets an assertion for an operation on the key. SetAssertion(key Key, assertion AssertionType) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. + // Do not use len(value) == 0 or value == nil to represent non-exist. + // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. BatchGet(keys []Key) (map[string][]byte, error) IsPessimistic() bool } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 05bc593b60e28..f566cae797e05 100755 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -681,6 +682,12 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, er // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { if val.(bool) { failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a1a9fda17ef29..3f86eb3cf6000 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -60,6 +60,10 @@ type tikvSnapshot struct { // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, // the result should not change. + // NOTE: This representation here is different from the BatchGet API. + // cached use len(value)=0 to represent a key-value entry doesn't exist (a reliable truth from TiKV). + // In the BatchGet API, it use no key-value entry to represent non-exist. + // It's OK as long as there are no zero-byte values in the protocol. cached map[string][]byte } @@ -92,7 +96,9 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { tmp := keys[:0] for _, key := range keys { if val, ok := s.cached[string(key)]; ok { - m[string(key)] = val + if len(val) > 0 { + m[string(key)] = val + } } else { tmp = append(tmp, key) } @@ -118,6 +124,7 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { if len(v) == 0 { return } + mu.Lock() m[string(k)] = v mu.Unlock() @@ -135,8 +142,8 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { if s.cached == nil { s.cached = make(map[string][]byte, len(m)) } - for key, value := range m { - s.cached[key] = value + for _, key := range keys { + s.cached[string(key)] = m[string(key)] } return m, nil @@ -273,6 +280,10 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { } } + failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { + panic("cache miss") + }) + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := &tikvrpc.Request{ diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 4b6e1af441e9f..de412fec23aab 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" @@ -117,6 +118,26 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { } } +func (s *testSnapshotSuite) TestSnapshotCache(c *C) { + txn := s.beginTxn(c) + c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Commit(context.Background()), IsNil) + + txn = s.beginTxn(c) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + _, err := snapshot.BatchGet([]kv.Key{kv.Key("x"), kv.Key("y")}) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail", `return(true)`), IsNil) + _, err = snapshot.Get(kv.Key("x")) + c.Assert(err, IsNil) + + _, err = snapshot.Get(kv.Key("y")) + c.Assert(kv.IsErrNotFound(err), IsTrue) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail"), IsNil) +} + func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { for _, rowNum := range s.rowNums { logutil.Logger(context.Background()).Debug("test BatchGetNotExist",