Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc_worker: fix serveral bugs of green gc #16413

Merged
merged 15 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 79 additions & 56 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren

// First try resolve locks with physical scan
err := w.resolveLocksPhysical(ctx, safePoint)

if err == nil {
return nil
}
Expand Down Expand Up @@ -1071,27 +1070,31 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
zap.Uint64("safePoint", safePoint))
startTime := time.Now()

stores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}

defer func() {
w.removeLockObservers(ctx, safePoint, stores)
}()
registeredStores := make(map[uint64]*metapb.Store)
defer w.removeLockObservers(ctx, safePoint, registeredStores)

err = w.registerLockObservers(ctx, safePoint, stores)
dirtyStores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}

for retry := 0; retry < 3; retry++ {
resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, stores)
err = w.registerLockObservers(ctx, safePoint, dirtyStores)
if err != nil {
return errors.Trace(err)
}
for id, store := range dirtyStores {
registeredStores[id] = store
}

resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, dirtyStores)
if err != nil {
return errors.Trace(err)
}

stores, err = w.getUpStoresMapForGC(ctx)
failpoint.Inject("beforeCheckLockObservers", func() {})

stores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1101,22 +1104,38 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
return errors.Trace(err)
}

// Remove clean stores from the set
for resolvedStore := range resolvedStores {
// Only stores that are both resolved and checked is clean.
// For each clean store, remove it from the stores set.
if _, ok := checkedStores[resolvedStore]; ok {
delete(stores, resolvedStore)
for store := range stores {
if _, ok := checkedStores[store]; ok {
// The store is resolved and checked.
if _, ok := resolvedStores[store]; ok {
delete(stores, store)
}
// The store is checked and has been resolved before.
if _, ok := dirtyStores[store]; !ok {
delete(stores, store)
}
Comment on lines +1113 to +1116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved stores may become dirty again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will be scanned and resolved again.

// If the store is checked and not resolved, we can retry to resolve it again, so leave it in dirtyStores.
} else if _, ok := registeredStores[store]; ok {
// The store has been registered and it's dirty due to too many collected locks. Fall back to legacy mode.
// We can't remove the lock observer from the store and retry the whole procedure because if the store
// receives duplicated remove and register requests during resolving locks, the store will be cleaned
// when checking but the lock observer drops some locks. It may results in missing locks.
return errors.Errorf("store %v is dirty", store)
}
}
dirtyStores = stores

// If there are still dirty stores, continue the loop to clean them again.
// Only dirty stores will be scanned in the next loop.
if len(stores) == 0 {
if len(dirtyStores) == 0 {
break
}
}

if len(dirtyStores) != 0 {
return errors.Errorf("still has %d dirty stores after physical resolve locks", len(dirtyStores))
}

logutil.Logger(ctx).Info("[gc worker] finish resolve locks with physical scan locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
Expand All @@ -1141,7 +1160,9 @@ func (w *GCWorker) registerLockObservers(ctx context.Context, safePoint uint64,
if err != nil {
return errors.Trace(err)
}

if resp.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}
errStr := resp.Resp.(*kvrpcpb.RegisterLockObserverResponse).Error
if len(errStr) > 0 {
return errors.Errorf("register lock observer on store %v returns error: %v", store.Id, errStr)
Expand All @@ -1161,31 +1182,41 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto
req := tikvrpc.NewRequest(tikvrpc.CmdCheckLockObserver, &kvrpcpb.CheckLockObserverRequest{
MaxTs: safePoint,
})

cleanStores := make(map[uint64]interface{}, len(stores))

logError := func(store *metapb.Store, err error) {
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
}

// When error occurs, this function doesn't fail immediately, but continues without adding the failed store to
// cleanStores set.

for _, store := range stores {
address := store.Address

resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
if resp.Resp == nil {
logError(store, tikv.ErrBodyMissing)
continue
}

respInner := resp.Resp.(*kvrpcpb.CheckLockObserverResponse)
if len(respInner.Error) > 0 {
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
logError(store, err)
continue
}

// No need to resolve observed locks on uncleaned stores.
if !respInner.IsClean {
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
zap.Any("store", store))
continue
}

Expand All @@ -1202,21 +1233,11 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto

if err != nil {
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
}

if respInner.IsClean {
cleanStores[store.Id] = nil
} else {
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
zap.String("uuid", w.uuid),
zap.Any("store", store))
}
cleanStores[store.Id] = nil
}

return cleanStores, nil
Expand All @@ -1231,25 +1252,29 @@ func (w *GCWorker) removeLockObservers(ctx context.Context, safePoint uint64, st
MaxTs: safePoint,
})

logError := func(store *metapb.Store, err error) {
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
}

for _, store := range stores {
address := store.Address

resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
if err != nil {
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
if resp.Resp == nil {
logError(store, tikv.ErrBodyMissing)
continue
}

errStr := resp.Resp.(*kvrpcpb.RemoveLockObserverResponse).Error
if len(errStr) > 0 {
err = errors.Errorf("remove lock observer on store %v returns error: %v", store.Id, errStr)
logutil.Logger(ctx).Error("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
}
}
}
Expand Down Expand Up @@ -1982,12 +2007,10 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo
if err != nil {
return errors.Trace(err)
}

resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
if resp == nil {
return errors.Errorf("physical scan lock response is nil")
if response.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}

resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
if len(resp.Error) > 0 {
return errors.Errorf("physical scan lock received error from store: %v", resp.Error)
}
Expand Down
Loading