Skip to content

Commit

Permalink
gcworker: skip GC if the safe point isn't changed (#22482)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored Jan 25, 2021
1 parent a88cc8c commit f3688eb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
44 changes: 26 additions & 18 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {
if err != nil || !ok {
return false, 0, errors.Trace(err)
}
newSafePoint, newSafePointValue, err := w.calculateNewSafePoint(ctx, now)
newSafePoint, newSafePointValue, err := w.calcNewSafePoint(ctx, now)
if err != nil || newSafePoint == nil {
return false, 0, errors.Trace(err)
}
Expand All @@ -339,12 +339,10 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {
return true, newSafePointValue, nil
}

// calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point.
func (w *GCWorker) calSafePointByMinStartTS(ctx context.Context, safePoint time.Time) time.Time {
func (w *GCWorker) calcGlobalMinStartTS(ctx context.Context) (uint64, error) {
kvs, err := w.store.GetSafePointKV().GetWithPrefix(infosync.ServerMinStartTSPath)
if err != nil {
logutil.Logger(ctx).Warn("get all minStartTS failed", zap.Error(err))
return safePoint
return 0, err
}

var globalMinStartTS uint64 = math.MaxUint64
Expand All @@ -358,14 +356,23 @@ func (w *GCWorker) calSafePointByMinStartTS(ctx context.Context, safePoint time.
globalMinStartTS = minStartTS
}
}
return globalMinStartTS, nil
}

safePointTS := variable.GoTimeToTS(safePoint)
if globalMinStartTS < safePointTS {
safePoint = time.Unix(0, oracle.ExtractPhysical(globalMinStartTS)*1e6)
// calcNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point.
func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint64) uint64 {
globalMinStartTS, err := w.calcGlobalMinStartTS(ctx)
if err != nil {
logutil.Logger(ctx).Warn("get all minStartTS failed", zap.Error(err))
return safePoint
}

if globalMinStartTS < safePoint {
logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session",
zap.String("uuid", w.uuid),
zap.Uint64("globalMinStartTS", globalMinStartTS),
zap.Time("safePoint", safePoint))
zap.Uint64("safePoint", safePoint))
safePoint = globalMinStartTS
}
return safePoint
}
Expand All @@ -375,9 +382,7 @@ func (w *GCWorker) getOracleTime() (time.Time, error) {
if err != nil {
return time.Time{}, errors.Trace(err)
}
physical := oracle.ExtractPhysical(currentVer.Ver)
sec, nsec := physical/1e3, (physical%1e3)*1e6
return time.Unix(sec, nsec), nil
return oracle.GetTimeFromTS(currentVer.Ver), nil
}

func (w *GCWorker) checkGCEnable() (bool, error) {
Expand Down Expand Up @@ -481,7 +486,7 @@ func (w *GCWorker) validateGCLifeTime(lifeTime time.Duration) (time.Duration, er
return gcMinLifeTime, err
}

func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*time.Time, uint64, error) {
func (w *GCWorker) calcNewSafePoint(ctx context.Context, now time.Time) (*time.Time, uint64, error) {
lifeTime, err := w.loadDurationWithDefault(gcLifeTimeKey, gcDefaultLifeTime)
if err != nil {
return nil, 0, errors.Trace(err)
Expand All @@ -491,21 +496,24 @@ func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*t
return nil, 0, err
}
metrics.GCConfigGauge.WithLabelValues(gcLifeTimeKey).Set(lifeTime.Seconds())

lastSafePoint, err := w.loadTime(gcSafePointKey)
if err != nil {
return nil, 0, errors.Trace(err)
}
safePoint := w.calSafePointByMinStartTS(ctx, now.Add(-*lifeTime))

safePointValue := oracle.ComposeTS(oracle.GetPhysical(safePoint), 0)
safePointValue := w.calcSafePointByMinStartTS(ctx, variable.GoTimeToTS(now.Add(-*lifeTime)))
safePointValue, err = w.setGCWorkerServiceSafePoint(ctx, safePointValue)
safePoint = oracle.GetTimeFromTS(safePointValue)

if err != nil {
return nil, 0, errors.Trace(err)
}

// safepoint is recorded in time.Time format which strips the logical part of the timestamp.
// To prevent the GC worker from keeping working due to the loss of logical part when the
// safe point isn't changed, we should compare them in time.Time format.
safePoint := oracle.GetTimeFromTS(safePointValue)
// We should never decrease safePoint.
if lastSafePoint != nil && safePoint.Before(*lastSafePoint) {
if lastSafePoint != nil && !safePoint.After(*lastSafePoint) {
logutil.BgLogger().Info("[gc worker] last safe point is later than current one."+
"No need to gc."+
"This might be caused by manually enlarging gc lifetime",
Expand Down
36 changes: 24 additions & 12 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,30 +241,29 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
spkv := s.store.GetSafePointKV()
err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10))
c.Assert(err, IsNil)
now := time.Now()
sp := s.gcWorker.calSafePointByMinStartTS(ctx, now)
c.Assert(sp.Second(), Equals, now.Second())
now := variable.GoTimeToTS(time.Now())
sp := s.gcWorker.calcSafePointByMinStartTS(ctx, now)
c.Assert(sp, Equals, now)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now)
zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6)
c.Assert(sp, Equals, zeroTime)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now)
c.Assert(sp, Equals, uint64(0))

err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), "1")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now)
c.Assert(sp, Equals, zeroTime)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now)
c.Assert(sp, Equals, uint64(0))

err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"),
strconv.FormatUint(variable.GoTimeToTS(now), 10))
strconv.FormatUint(now, 10))
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"),
strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10))
strconv.FormatUint(now-oracle.EncodeTSO(20000), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now.Add(-10*time.Second))
c.Assert(sp.Second(), Equals, now.Add(-20*time.Second).Second())
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000))
c.Assert(sp, Equals, now-oracle.EncodeTSO(20000))
}

func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
Expand Down Expand Up @@ -378,6 +377,19 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency()
c.Assert(err, IsNil)
c.Assert(useAutoConcurrency, IsTrue)

// Check skipping GC if safe point is not changed.
safePointTime, err := s.gcWorker.loadTime(gcSafePointKey)
minStartTS := variable.GoTimeToTS(*safePointTime) + 1
c.Assert(err, IsNil)
spkv := s.store.GetSafePointKV()
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(minStartTS, 10))
c.Assert(err, IsNil)
s.oracle.AddOffset(time.Minute * 40)
ok, safepoint, err := s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
c.Assert(safepoint, Equals, uint64(0))
}

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
Expand Down

0 comments on commit f3688eb

Please sign in to comment.