Skip to content

Commit

Permalink
gc_worker: add more tests for resolveLockPhysical (#18715)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored Aug 4, 2020
1 parent 75d58d5 commit 159adef
Showing 1 changed file with 44 additions and 26 deletions.
70 changes: 44 additions & 26 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,30 +1277,34 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
}

ctx := context.Background()
var safePoint uint64 = 10000

// No lock
reset()
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsTrue)
c.Assert(err, IsNil)

// Should return error when fails to register lock observers.
// Should fall back on the legacy mode when fails to register lock observers.
reset()
s.client.registerLockObserverHandler = alwaysFailHandler
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
c.Assert(err, ErrorMatches, "register lock observer.*")
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)

// Should return error when fails to resolve locks.
// Should fall back when fails to resolve locks.
reset()
s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
locks := []*kvrpcpb.LockInfo{{Key: []byte{0}}}
return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: locks, Error: ""}}, nil
}
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr", "return(100)"), IsNil)
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr"), IsNil)
c.Assert(err, ErrorMatches, "injectedError")

// Shouldn't return error when fails to scan locks.
// Shouldn't fall back when fails to scan locks less than 3 times.
reset()
var returnError uint32 = 1
s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
Expand All @@ -1309,38 +1313,43 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
}
return alwaysSucceedHanlder(addr, req)
}
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsTrue)
c.Assert(err, IsNil)

// Should return error if reaches retry limit
// Should fall back if reaches retry limit
reset()
s.client.physicalScanLockHandler = alwaysFailHandler
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
c.Assert(err, ErrorMatches, ".*dirty.*")
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)

// Should return error when one registered store is dirty.
// Should fall back when one registered store is dirty.
reset()
s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil
}
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
c.Assert(err, ErrorMatches, "store.*dirty")
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)

// Should return error when fails to check lock observers.
// When fails to check lock observer in a store, we assume the store is dirty.
// Should fall back when fails to check lock observers.
reset()
s.client.checkLockObserverHandler = alwaysFailHandler
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
// When fails to check lock observer in a store, we assume the store is dirty.
c.Assert(err, ErrorMatches, "store.*dirty")
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)

// Shouldn't return error when the dirty store is newly added.
// Shouldn't fall back when the dirty store is newly added.
reset()
var wg sync.WaitGroup
wg.Add(1)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
go func() {
defer wg.Done()
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsTrue)
c.Assert(err, IsNil)
}()
// Sleep to let the goroutine pause.
Expand All @@ -1358,13 +1367,14 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
wg.Wait()

// Shouldn't return error when a store is removed.
// Shouldn't fall back when a store is removed.
reset()
wg.Add(1)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
go func() {
defer wg.Done()
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsTrue)
c.Assert(err, IsNil)
}()
// Sleep to let the goroutine pause.
Expand All @@ -1373,14 +1383,15 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
wg.Wait()

// Should return error when a cleaned store becomes dirty.
// Should fall back when a cleaned store becomes dirty.
reset()
wg.Add(1)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
go func() {
defer wg.Done()
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
c.Assert(err, ErrorMatches, "store.*dirty")
physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsFalse)
c.Assert(err, IsNil)
}()
// Sleep to let the goroutine pause.
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -1408,6 +1419,13 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
}
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
wg.Wait()

// Shouldn't fall back when fails to remove lock observers.
reset()
s.client.removeLockObserverHandler = alwaysFailHandler
physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true)
c.Assert(physicalUsed, IsTrue)
c.Assert(err, IsNil)
}

func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) {
Expand Down

0 comments on commit 159adef

Please sign in to comment.