Skip to content

Commit

Permalink
gcworker: fix gc miss locks when region merged during scanning & reso…
Browse files Browse the repository at this point in the history
…lving locks (pingcap#22252)

Signed-off-by: lysu <sulifx@gmail.com>
  • Loading branch information
lysu authored Jan 7, 2021
1 parent d1d5cc4 commit bedd599
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 17 deletions.
20 changes: 13 additions & 7 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type GCWorker struct {
cancel context.CancelFunc
done chan error
testingKnobs struct {
scanLocks func(key []byte) []*tikv.Lock
resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error)
scanLocks func(key []byte, regionID uint64) []*tikv.Lock
resolveLocks func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error)
}
}

Expand Down Expand Up @@ -1064,12 +1064,18 @@ retryScanAndResolve:
locks[i] = tikv.NewLock(locksInfo[i])
}
if w.testingKnobs.scanLocks != nil {
locks = append(locks, w.testingKnobs.scanLocks(key)...)
locks = append(locks, w.testingKnobs.scanLocks(key, loc.Region.GetID())...)
}
locForResolve := loc
for {
ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
var (
ok bool
err1 error
)
if w.testingKnobs.resolveLocks != nil {
ok, err1 = w.testingKnobs.resolveLocks(loc.Region)
ok, err1 = w.testingKnobs.resolveLocks(locks, locForResolve.Region)
} else {
ok, err1 = w.store.GetLockResolver().BatchResolveLocks(bo, locks, locForResolve.Region)
}
if err1 != nil {
return stat, errors.Trace(err1)
Expand All @@ -1084,7 +1090,7 @@ retryScanAndResolve:
return stat, errors.Trace(err)
}
if stillInSame {
loc = refreshedLoc
locForResolve = refreshedLoc
continue
}
continue retryScanAndResolve
Expand All @@ -1097,7 +1103,7 @@ retryScanAndResolve:
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
zap.String("uuid", w.uuid),
zap.Uint64("region", loc.Region.GetID()),
zap.Uint64("region", locForResolve.Region.GetID()),
zap.Int("scan lock limit", gcScanLockLimit))
metrics.GCRegionTooManyLocksCounter.Inc()
key = locks[len(locks)-1].Key
Expand Down
95 changes: 85 additions & 10 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/store/mockoracle"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -51,13 +52,18 @@ func TestT(t *testing.T) {
}

type testGCWorkerSuite struct {
store tikv.Storage
cluster cluster.Cluster
oracle *mockoracle.MockOracle
gcWorker *GCWorker
dom *domain.Domain
client *testGCWorkerClient
pdClient pd.Client
store tikv.Storage
cluster cluster.Cluster
oracle *mockoracle.MockOracle
gcWorker *GCWorker
dom *domain.Domain
client *testGCWorkerClient
pdClient pd.Client
initRegion struct {
storeIDs []uint64
peerIDs []uint64
regionID uint64
}
}

var _ = SerialSuites(&testGCWorkerSuite{})
Expand All @@ -74,8 +80,9 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) {
}

store, err := mockstore.NewMockStore(
mockstore.WithStoreType(mockstore.MockTiKV),
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockstore.BootstrapWithMultiStores(c, 3)
s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mockstore.BootstrapWithMultiStores(c, 3)
s.cluster = c
}),
mockstore.WithClientHijacker(hijackClient),
Expand Down Expand Up @@ -849,7 +856,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
resolveCnt int
resolveCntRef = &resolveCnt
)
s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock {
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
*scanCntRef++
return []*tikv.Lock{
{
Expand All @@ -860,7 +867,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
},
}
}
s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) {
s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) {
*resolveCntRef++
if *resolveCntRef == 1 {
s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID)
Expand All @@ -875,6 +882,74 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
c.Assert(scanCnt, Equals, 1)
}

func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(c *C) {
var (
firstAccess = true
firstAccessRef = &firstAccess
resolvedLock [][]byte
)

// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := []uint64{s.cluster.AllocID(), s.cluster.AllocID(), s.cluster.AllocID()}
s.cluster.Split(s.initRegion.regionID, region2, []byte("m"), newPeers, newPeers[0])

// init a, b lock in region1 and o, p locks in region2
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
if regionID == s.initRegion.regionID {
return []*tikv.Lock{{Key: []byte("a")}, {Key: []byte("b")}}
}
if regionID == region2 {
return []*tikv.Lock{{Key: []byte("o")}, {Key: []byte("p")}}
}
return []*tikv.Lock{}
}

s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) {
if regionID.GetID() == s.initRegion.regionID && *firstAccessRef {
*firstAccessRef = false
// merge region2 into region1 and return EpochNotMatch error.
mCluster := s.cluster.(*mocktikv.Cluster)
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
s.store.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
// also let region1 contains all 4 locks
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
if regionID == s.initRegion.regionID {
locks := []*tikv.Lock{
{Key: []byte("a")},
{Key: []byte("b")},
{Key: []byte("o")},
{Key: []byte("p")},
}
for i, lock := range locks {
if bytes.Compare(key, lock.Key) <= 0 {
return locks[i:]
}
}
}
return []*tikv.Lock{}
}
return false, nil
}
for _, lock := range locks {
resolvedLock = append(resolvedLock, lock.Key)
}
return true, nil
}

_, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte(""), []byte("z"))
c.Assert(err, IsNil)
c.Assert(len(resolvedLock), Equals, 4)
expects := [][]byte{[]byte("a"), []byte("b"), []byte("o"), []byte("p")}
for i, l := range resolvedLock {
c.Assert(l, BytesEquals, expects[i])
}
}

func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
gcSafePointCacheInterval = 0

Expand Down

0 comments on commit bedd599

Please sign in to comment.