diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 1eb4aeb102740..b8cef211e2e8c 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -148,7 +148,7 @@ const ( getMaxBackoff = 20000 prewriteMaxBackoff = 20000 cleanupMaxBackoff = 20000 - GcMaxBackoff = 100000 + GcOneRegionMaxBackoff = 20000 GcResolveLockMaxBackoff = 100000 GcDeleteRangeMaxBackoff = 100000 rawkvMaxBackoff = 20000 diff --git a/store/tikv/client.go b/store/tikv/client.go index b3e7a980bac6b..bc912eef42cc5 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -52,6 +52,7 @@ const ( readTimeoutShort = 20 * time.Second // For requests that read/write several key-values. ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. + GCTimeout = 5 * time.Minute grpcInitialWindowSize = 1 << 30 grpcInitialConnWindowSize = 1 << 30 diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index d5f2cb150e09a..cdf1d0f748b49 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -22,6 +22,7 @@ import ( "time" "github.com/juju/errors" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb" "github.com/pingcap/tidb/ddl/util" @@ -90,6 +91,7 @@ func (w *GCWorker) Close() { const ( gcTimeFormat = "20060102-15:04:05 -0700 MST" gcWorkerTickInterval = time.Minute + gcJobLogTickInterval = time.Minute * 10 gcWorkerLease = time.Minute * 2 gcLeaderUUIDKey = "tikv_gc_leader_uuid" gcLeaderDescKey = "tikv_gc_leader_desc" @@ -222,7 +224,7 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error { } w.gcIsRunning = true - log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint) + log.Infof("[gc worker] %s starts the whole job, safePoint: %v", w.uuid, safePoint) go w.runGCJob(ctx, safePoint) return nil } @@ -316,21 +318,23 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { gcWorkerCounter.WithLabelValues("run_job").Inc() err := resolveLocks(ctx, w.store, safePoint, w.uuid) if err != nil { - gcFailureCounter.WithLabelValues("resolve_lock").Inc() + log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err) + gcJobFailureCounter.WithLabelValues("resolve_lock").Inc() w.done <- errors.Trace(err) return } err = w.deleteRanges(ctx, safePoint) if err != nil { - gcFailureCounter.WithLabelValues("delete_range").Inc() + log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err) + gcJobFailureCounter.WithLabelValues("delete_range").Inc() w.done <- errors.Trace(err) return } err = doGC(ctx, w.store, safePoint, w.uuid) if err != nil { - gcFailureCounter.WithLabelValues("gc").Inc() - log.Error("do GC returns an error", err) + log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err) w.gcIsRunning = false + gcJobFailureCounter.WithLabelValues("gc").Inc() w.done <- errors.Trace(err) return } @@ -347,7 +351,7 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { return errors.Trace(err) } - bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, goctx.Background()) + bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, ctx) log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges)) startTime := time.Now() regions := 0 @@ -431,7 +435,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident Limit: gcScanLockLimit, }, } - bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, goctx.Background()) + bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, ctx) log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint) startTime := time.Now() @@ -498,7 +502,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident } } else { log.Infof("[gc worker] %s, region %d has more than %d locks", identifier, loc.Region.GetID(), gcScanLockLimit) - gcRegionTooMuchLocksCounter.Inc() + gcRegionTooManyLocksCounter.Inc() key = locks[len(locks)-1].Key } } @@ -518,23 +522,23 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st // Sleep to wait for all other tidb instances update their safepoint cache. time.Sleep(gcSafePointCacheInterval) - req := &tikvrpc.Request{ - Type: tikvrpc.CmdGC, - GC: &kvrpcpb.GCRequest{ - SafePoint: safePoint, - }, - } - bo := tikv.NewBackoffer(tikv.GcMaxBackoff, goctx.Background()) - log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) startTime := time.Now() - regions := 0 + successRegions := 0 + failedRegions := 0 + + ticker := time.NewTicker(gcJobLogTickInterval) + defer ticker.Stop() + bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) var key []byte for { select { case <-ctx.Done(): return errors.New("[gc worker] gc job canceled") + case <-ticker.C: + log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", + identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) default: } @@ -542,39 +546,72 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st if err != nil { return errors.Trace(err) } - resp, err := store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutLong) - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } + + var regionErr *errorpb.Error + regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) + + // we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to + // make the process correct. if regionErr != nil { err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) + if err == nil { + continue } - continue - } - gcResp := resp.GC - if gcResp == nil { - return errors.Trace(tikv.ErrBodyMissing) } - if gcResp.GetError() != nil { - return errors.Errorf("unexpected gc error: %s", gcResp.GetError()) + + if err != nil { + failedRegions++ + gcActionRegionResultCounter.WithLabelValues("fail").Inc() + log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey)) + } else { + successRegions++ + gcActionRegionResultCounter.WithLabelValues("success").Inc() } - regions++ + key = loc.EndKey if len(key) == 0 { break } + bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) } - log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime)) + log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", + identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) return nil } +// these two errors should not return together, for more, see the func 'doGC' +func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdGC, + GC: &kvrpcpb.GCRequest{ + SafePoint: safePoint, + }, + } + + resp, err := store.SendReq(bo, req, region, tikv.GCTimeout) + if err != nil { + return nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + return regionErr, nil + } + + gcResp := resp.GC + if gcResp == nil { + return nil, errors.Trace(tikv.ErrBodyMissing) + } + if gcResp.GetError() != nil { + return nil, errors.Errorf("unexpected gc error: %s", gcResp.GetError()) + } + + return nil, nil +} + func (w *GCWorker) checkLeader() (bool, error) { gcWorkerCounter.WithLabelValues("check_leader").Inc() session := createSession(w.store) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 5d783843690c0..ffbf00bcbe235 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -16,11 +16,15 @@ package gcworker import ( "flag" "math" + "testing" "time" + gofail "github.com/coreos/gofail/runtime" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb" "github.com/pingcap/tidb/store/tikv" + goctx "golang.org/x/net/context" ) var ( @@ -28,6 +32,10 @@ var ( pdAddrs = flag.String("pd-addrs", "127.0.0.1:2379", "pd addrs") ) +func TestT(t *testing.T) { + TestingT(t) +} + type testGCWorkerSuite struct { store tikv.Storage oracle *tikv.MockOracle @@ -114,3 +122,32 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(err, IsNil) s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) } + +func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { + ctx := goctx.Background() + bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) + loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("")) + c.Assert(err, IsNil) + var regionErr *errorpb.Error + regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region) + c.Assert(regionErr, IsNil) + c.Assert(err, IsNil) + + gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("timeout")`) + regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region) + c.Assert(regionErr, IsNil) + c.Assert(err, NotNil) + gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult") + + gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("GCNotLeader")`) + regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region) + c.Assert(regionErr.GetNotLeader(), NotNil) + c.Assert(err, IsNil) + gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult") + + gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("GCServerIsBusy")`) + regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region) + c.Assert(regionErr.GetServerIsBusy(), NotNil) + c.Assert(err, IsNil) + gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult") +} diff --git a/store/tikv/gcworker/misc.go b/store/tikv/gcworker/misc.go index 49dd95fc30ca7..844a096489627 100644 --- a/store/tikv/gcworker/misc.go +++ b/store/tikv/gcworker/misc.go @@ -43,7 +43,7 @@ var ( Help: "Gauge of GC configs.", }, []string{"type"}) - gcFailureCounter = prometheus.NewCounterVec( + gcJobFailureCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb", Subsystem: "tikvclient", @@ -51,7 +51,15 @@ var ( Help: "Counter of gc job failure.", }, []string{"type"}) - gcRegionTooMuchLocksCounter = prometheus.NewCounter( + gcActionRegionResultCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "gc_action_result", + Help: "Counter of gc action result on region level.", + }, []string{"type"}) + + gcRegionTooManyLocksCounter = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "tidb", Subsystem: "tikvclient", @@ -64,6 +72,7 @@ func init() { prometheus.MustRegister(gcWorkerCounter) prometheus.MustRegister(gcConfigGauge) prometheus.MustRegister(gcHistogram) - prometheus.MustRegister(gcFailureCounter) - prometheus.MustRegister(gcRegionTooMuchLocksCounter) + prometheus.MustRegister(gcJobFailureCounter) + prometheus.MustRegister(gcActionRegionResultCounter) + prometheus.MustRegister(gcRegionTooManyLocksCounter) } diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index a5f59a29e3af7..dcf0533c1f152 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -60,6 +60,23 @@ func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequ // SendReq sends a request to tikv server. func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) { + + // gofail: var tikvStoreSendReqResult string + // switch tikvStoreSendReqResult { + // case "timeout": + // return nil, errors.New("timeout") + // case "GCNotLeader": + // return &tikvrpc.Response{ + // Type: tikvrpc.CmdGC, + // GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}, + // }, nil + // case "GCServerIsBusy": + // return &tikvrpc.Response{ + // Type: tikvrpc.CmdGC, + // GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, + // }, nil + // } + for { ctx, err := s.regionCache.GetRPCContext(bo, regionID) if err != nil {