-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv:Ignore error and do gc anyway #5797
Changes from all commits
d1c6181
774980e
3a09833
1f724ac
68c8f4b
65ef1bf
971928b
f54618f
68b5a60
6702d83
4112b67
30516f1
9e0c930
e02f351
8360650
afd9c75
d24eb68
3d0155a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"time" | ||
|
||
"github.com/juju/errors" | ||
"github.com/pingcap/kvproto/pkg/errorpb" | ||
"github.com/pingcap/kvproto/pkg/kvrpcpb" | ||
"github.com/pingcap/tidb" | ||
"github.com/pingcap/tidb/ddl/util" | ||
|
@@ -90,6 +91,7 @@ func (w *GCWorker) Close() { | |
const ( | ||
gcTimeFormat = "20060102-15:04:05 -0700 MST" | ||
gcWorkerTickInterval = time.Minute | ||
gcJobLogTickInterval = time.Minute * 10 | ||
gcWorkerLease = time.Minute * 2 | ||
gcLeaderUUIDKey = "tikv_gc_leader_uuid" | ||
gcLeaderDescKey = "tikv_gc_leader_desc" | ||
|
@@ -222,7 +224,7 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error { | |
} | ||
|
||
w.gcIsRunning = true | ||
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint) | ||
log.Infof("[gc worker] %s starts the whole job, safePoint: %v", w.uuid, safePoint) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ?? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I change the log because it is too similar to the log in the func |
||
go w.runGCJob(ctx, safePoint) | ||
return nil | ||
} | ||
|
@@ -316,21 +318,23 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { | |
gcWorkerCounter.WithLabelValues("run_job").Inc() | ||
err := resolveLocks(ctx, w.store, safePoint, w.uuid) | ||
if err != nil { | ||
gcFailureCounter.WithLabelValues("resolve_lock").Inc() | ||
log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err) | ||
gcJobFailureCounter.WithLabelValues("resolve_lock").Inc() | ||
w.done <- errors.Trace(err) | ||
return | ||
} | ||
err = w.deleteRanges(ctx, safePoint) | ||
if err != nil { | ||
gcFailureCounter.WithLabelValues("delete_range").Inc() | ||
log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err) | ||
gcJobFailureCounter.WithLabelValues("delete_range").Inc() | ||
w.done <- errors.Trace(err) | ||
return | ||
} | ||
err = doGC(ctx, w.store, safePoint, w.uuid) | ||
if err != nil { | ||
gcFailureCounter.WithLabelValues("gc").Inc() | ||
log.Error("do GC returns an error", err) | ||
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err) | ||
w.gcIsRunning = false | ||
gcJobFailureCounter.WithLabelValues("gc").Inc() | ||
w.done <- errors.Trace(err) | ||
return | ||
} | ||
|
@@ -347,7 +351,7 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { | |
return errors.Trace(err) | ||
} | ||
|
||
bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, goctx.Background()) | ||
bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, ctx) | ||
log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges)) | ||
startTime := time.Now() | ||
regions := 0 | ||
|
@@ -431,7 +435,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident | |
Limit: gcScanLockLimit, | ||
}, | ||
} | ||
bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, goctx.Background()) | ||
bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, ctx) | ||
|
||
log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint) | ||
startTime := time.Now() | ||
|
@@ -498,7 +502,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident | |
} | ||
} else { | ||
log.Infof("[gc worker] %s, region %d has more than %d locks", identifier, loc.Region.GetID(), gcScanLockLimit) | ||
gcRegionTooMuchLocksCounter.Inc() | ||
gcRegionTooManyLocksCounter.Inc() | ||
key = locks[len(locks)-1].Key | ||
} | ||
} | ||
|
@@ -518,63 +522,96 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st | |
// Sleep to wait for all other tidb instances update their safepoint cache. | ||
time.Sleep(gcSafePointCacheInterval) | ||
|
||
req := &tikvrpc.Request{ | ||
Type: tikvrpc.CmdGC, | ||
GC: &kvrpcpb.GCRequest{ | ||
SafePoint: safePoint, | ||
}, | ||
} | ||
bo := tikv.NewBackoffer(tikv.GcMaxBackoff, goctx.Background()) | ||
|
||
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) | ||
startTime := time.Now() | ||
regions := 0 | ||
successRegions := 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to add some metrics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
failedRegions := 0 | ||
|
||
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
var key []byte | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("[gc worker] gc job canceled") | ||
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
default: | ||
} | ||
|
||
loc, err := store.GetRegionCache().LocateKey(bo, key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
resp, err := store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutLong) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
regionErr, err := resp.GetRegionError() | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
var regionErr *errorpb.Error | ||
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) | ||
|
||
// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to | ||
// make the process correct. | ||
if regionErr != nil { | ||
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) | ||
if err != nil { | ||
return errors.Trace(err) | ||
if err == nil { | ||
continue | ||
} | ||
continue | ||
} | ||
gcResp := resp.GC | ||
if gcResp == nil { | ||
return errors.Trace(tikv.ErrBodyMissing) | ||
} | ||
if gcResp.GetError() != nil { | ||
return errors.Errorf("unexpected gc error: %s", gcResp.GetError()) | ||
|
||
if err != nil { | ||
failedRegions++ | ||
gcActionRegionResultCounter.WithLabelValues("fail").Inc() | ||
log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey)) | ||
} else { | ||
successRegions++ | ||
gcActionRegionResultCounter.WithLabelValues("success").Inc() | ||
} | ||
regions++ | ||
|
||
key = loc.EndKey | ||
if len(key) == 0 { | ||
break | ||
} | ||
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
} | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime)) | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
return nil | ||
} | ||
|
||
// these two errors should not return together, for more, see the func 'doGC' | ||
func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) { | ||
req := &tikvrpc.Request{ | ||
Type: tikvrpc.CmdGC, | ||
GC: &kvrpcpb.GCRequest{ | ||
SafePoint: safePoint, | ||
}, | ||
} | ||
|
||
resp, err := store.SendReq(bo, req, region, tikv.GCTimeout) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
regionErr, err := resp.GetRegionError() | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
if regionErr != nil { | ||
return regionErr, nil | ||
} | ||
|
||
gcResp := resp.GC | ||
if gcResp == nil { | ||
return nil, errors.Trace(tikv.ErrBodyMissing) | ||
} | ||
if gcResp.GetError() != nil { | ||
return nil, errors.Errorf("unexpected gc error: %s", gcResp.GetError()) | ||
} | ||
|
||
return nil, nil | ||
} | ||
|
||
func (w *GCWorker) checkLeader() (bool, error) { | ||
gcWorkerCounter.WithLabelValues("check_leader").Inc() | ||
session := createSession(w.store) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think gc command can definitely returns in 20 secs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will change it for one request