From 792d9582154175eb8df3cb2c6006461330b63b16 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 10 Jun 2020 14:18:00 +0800 Subject: [PATCH] gc_worker: Add logs about adjusting gc safepoint by global minimum start ts (#17892) Co-authored-by: MyonKeminta Co-authored-by: pingcap-github-bot --- store/tikv/gcworker/gc_worker.go | 25 ++++++++++++++++--------- store/tikv/gcworker/gc_worker_test.go | 9 +++++---- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 9fb727a04d30a..1dd366f294224 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -330,26 +330,33 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { } // calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point. -func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time { +func (w *GCWorker) calSafePointByMinStartTS(ctx context.Context, safePoint time.Time) time.Time { kvs, err := w.store.GetSafePointKV().GetWithPrefix(infosync.ServerMinStartTSPath) if err != nil { - logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err)) + logutil.Logger(ctx).Warn("get all minStartTS failed", zap.Error(err)) return safePoint } - safePointTS := variable.GoTimeToTS(safePoint) + var globalMinStartTS uint64 = math.MaxUint64 for _, v := range kvs { minStartTS, err := strconv.ParseUint(string(v.Value), 10, 64) if err != nil { - logutil.BgLogger().Warn("parse minStartTS failed", zap.Error(err)) + logutil.Logger(ctx).Warn("parse minStartTS failed", zap.Error(err)) continue } - if minStartTS < safePointTS { - safePointTS = minStartTS + if minStartTS < globalMinStartTS { + globalMinStartTS = minStartTS } } - safePoint = time.Unix(0, oracle.ExtractPhysical(safePointTS)*1e6) - logutil.BgLogger().Debug("calSafePointByMinStartTS", zap.Time("safePoint", safePoint)) + + safePointTS := variable.GoTimeToTS(safePoint) + if globalMinStartTS < safePointTS { + safePoint = time.Unix(0, oracle.ExtractPhysical(globalMinStartTS)*1e6) + logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session", + zap.String("uuid", w.uuid), + zap.Uint64("globalMinStartTS", globalMinStartTS), + zap.Time("safePoint", safePoint)) + } return safePoint } @@ -478,7 +485,7 @@ func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*t if err != nil { return nil, 0, errors.Trace(err) } - safePoint := w.calSafePointByMinStartTS(now.Add(-*lifeTime)) + safePoint := w.calSafePointByMinStartTS(ctx, now.Add(-*lifeTime)) safePointValue := oracle.ComposeTS(oracle.GetPhysical(safePoint), 0) safePointValue, err = w.setGCWorkerServiceSafePoint(ctx, safePointValue) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 9151e1d7214fb..3cafc340b5c9f 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -231,15 +231,16 @@ func (s *testGCWorkerSuite) TestGetOracleTime(c *C) { } func (s *testGCWorkerSuite) TestMinStartTS(c *C) { + ctx := context.Background() spkv := s.store.GetSafePointKV() err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) c.Assert(err, IsNil) now := time.Now() - sp := s.gcWorker.calSafePointByMinStartTS(now) + sp := s.gcWorker.calSafePointByMinStartTS(ctx, now) c.Assert(sp.Second(), Equals, now.Second()) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") c.Assert(err, IsNil) - sp = s.gcWorker.calSafePointByMinStartTS(now) + sp = s.gcWorker.calSafePointByMinStartTS(ctx, now) zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6) c.Assert(sp, Equals, zeroTime) @@ -247,7 +248,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) { c.Assert(err, IsNil) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), "1") c.Assert(err, IsNil) - sp = s.gcWorker.calSafePointByMinStartTS(now) + sp = s.gcWorker.calSafePointByMinStartTS(ctx, now) c.Assert(sp, Equals, zeroTime) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), @@ -256,7 +257,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) { err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10)) c.Assert(err, IsNil) - sp = s.gcWorker.calSafePointByMinStartTS(now.Add(-10 * time.Second)) + sp = s.gcWorker.calSafePointByMinStartTS(ctx, now.Add(-10*time.Second)) c.Assert(sp.Second(), Equals, now.Add(-20*time.Second).Second()) }