Skip to content

Commit

Permalink
gc_worker: resolve locks on offline stores (#18383)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored Jul 14, 2020
1 parent a6971dd commit 2024e44
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
33 changes: 18 additions & 15 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) {
return w.loadGCConcurrencyWithDefault()
}

stores, err := w.getUpStoresForGC(ctx)
stores, err := w.getStoresForGC(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.",
Expand Down Expand Up @@ -458,8 +458,8 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {
return true, nil
}

// validateGCLiftTime checks whether life time is small than min gc life time.
func (w *GCWorker) validateGCLiftTime(lifeTime time.Duration) (time.Duration, error) {
// validateGCLifeTime checks whether life time is small than min gc life time.
func (w *GCWorker) validateGCLifeTime(lifeTime time.Duration) (time.Duration, error) {
if lifeTime >= gcMinLifeTime {
return lifeTime, nil
}
Expand All @@ -477,7 +477,7 @@ func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*t
if err != nil {
return nil, 0, errors.Trace(err)
}
*lifeTime, err = w.validateGCLiftTime(*lifeTime)
*lifeTime, err = w.validateGCLifeTime(*lifeTime)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -714,7 +714,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64, concu

func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte, concurrency int) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.getUpStoresForGC(ctx)
stores, err := w.getStoresForGC(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] delete ranges: got an error while trying to get store list from PD",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -790,8 +790,14 @@ const (
// needsGCOperationForStore checks if the store-level requests related to GC needs to be sent to the store. The store-level
// requests includes UnsafeDestroyRange, PhysicalScanLock, etc.
func needsGCOperationForStore(store *metapb.Store) (bool, error) {
engineLabel := ""
// TombStone means the store has been removed from the cluster and there isn't any peer on the store, so needn't do GC for it.
// Offline means the store is being removed from the cluster and it becomes tombstone after all peers are removed from it,
// so we need to do GC for it.
if store.State == metapb.StoreState_Tombstone {
return false, nil
}

engineLabel := ""
for _, label := range store.GetLabels() {
if label.GetKey() == engineLabelKey {
engineLabel = label.GetValue()
Expand Down Expand Up @@ -820,18 +826,15 @@ func needsGCOperationForStore(store *metapb.Store) (bool, error) {
}
}

// getUpStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
// getStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}

upStores := make([]*metapb.Store, 0, len(stores))
for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}
needsGCOp, err := needsGCOperationForStore(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -843,8 +846,8 @@ func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error
return upStores, nil
}

func (w *GCWorker) getUpStoresMapForGC(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getUpStoresForGC(ctx)
func (w *GCWorker) getStoresMapForGC(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getStoresForGC(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1087,7 +1090,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
registeredStores := make(map[uint64]*metapb.Store)
defer w.removeLockObservers(ctx, safePoint, registeredStores)

dirtyStores, err := w.getUpStoresMapForGC(ctx)
dirtyStores, err := w.getStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1108,7 +1111,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e

failpoint.Inject("beforeCheckLockObservers", func() {})

stores, err := w.getUpStoresMapForGC(ctx)
stores, err := w.getStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
39 changes: 21 additions & 18 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,32 +507,35 @@ func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) {
}

func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) {
newStore := func(hasEngineLabel bool, engineLabel string) *metapb.Store {
newStore := func(state metapb.StoreState, hasEngineLabel bool, engineLabel string) *metapb.Store {
store := &metapb.Store{}
store.State = state
if hasEngineLabel {
store.Labels = []*metapb.StoreLabel{{Key: engineLabelKey, Value: engineLabel}}
}
return store
}

// TiKV needs to do the store-level GC operations.
res, err := needsGCOperationForStore(newStore(false, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)
for _, state := range []metapb.StoreState{metapb.StoreState_Up, metapb.StoreState_Offline, metapb.StoreState_Tombstone} {
needGC := state != metapb.StoreState_Tombstone
res, err := needsGCOperationForStore(newStore(state, false, ""))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)
res, err = needsGCOperationForStore(newStore(state, true, ""))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)
res, err = needsGCOperationForStore(newStore(state, true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, Equals, needGC)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(state, true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)
}
// Throw an error for unknown store types.
_, err = needsGCOperationForStore(newStore(true, "invalid"))
_, err := needsGCOperationForStore(newStore(metapb.StoreState_Up, true, "invalid"))
c.Assert(err, NotNil)
}

Expand Down Expand Up @@ -579,7 +582,7 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
c.Assert(err, IsNil)
c.Assert(preparedRanges, DeepEquals, ranges)

stores, err := s.gcWorker.getUpStoresForGC(context.Background())
stores, err := s.gcWorker.getStoresForGC(context.Background())
c.Assert(err, IsNil)
c.Assert(len(stores), Equals, 3)

Expand Down Expand Up @@ -1000,7 +1003,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca

const scanLockLimit = 3

storesMap, err := s.gcWorker.getUpStoresMapForGC(context.Background())
storesMap, err := s.gcWorker.getStoresMapForGC(context.Background())
c.Assert(err, IsNil)
scanner := newMergeLockScanner(100000, s.client, storesMap)
scanner.scanLockLimit = scanLockLimit
Expand Down

0 comments on commit 2024e44

Please sign in to comment.