Skip to content

Commit

Permalink
store/tikv:Ignore error and do gc anyway (#5797)
Browse files Browse the repository at this point in the history
* ignore error when gc, and continue do gc next region
  • Loading branch information
wentaoxu authored and ngaut committed Feb 7, 2018
1 parent 69881b5 commit c6046b2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 41 deletions.
2 changes: 1 addition & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const (
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
GcMaxBackoff = 100000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
GcDeleteRangeMaxBackoff = 100000
rawkvMaxBackoff = 20000
Expand Down
1 change: 1 addition & 0 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute

grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
Expand Down
109 changes: 73 additions & 36 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/ddl/util"
Expand Down Expand Up @@ -90,6 +91,7 @@ func (w *GCWorker) Close() {
const (
gcTimeFormat = "20060102-15:04:05 -0700 MST"
gcWorkerTickInterval = time.Minute
gcJobLogTickInterval = time.Minute * 10
gcWorkerLease = time.Minute * 2
gcLeaderUUIDKey = "tikv_gc_leader_uuid"
gcLeaderDescKey = "tikv_gc_leader_desc"
Expand Down Expand Up @@ -222,7 +224,7 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error {
}

w.gcIsRunning = true
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint)
log.Infof("[gc worker] %s starts the whole job, safePoint: %v", w.uuid, safePoint)
go w.runGCJob(ctx, safePoint)
return nil
}
Expand Down Expand Up @@ -316,21 +318,23 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()
err := resolveLocks(ctx, w.store, safePoint, w.uuid)
if err != nil {
gcFailureCounter.WithLabelValues("resolve_lock").Inc()
log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err)
gcJobFailureCounter.WithLabelValues("resolve_lock").Inc()
w.done <- errors.Trace(err)
return
}
err = w.deleteRanges(ctx, safePoint)
if err != nil {
gcFailureCounter.WithLabelValues("delete_range").Inc()
log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err)
gcJobFailureCounter.WithLabelValues("delete_range").Inc()
w.done <- errors.Trace(err)
return
}
err = doGC(ctx, w.store, safePoint, w.uuid)
if err != nil {
gcFailureCounter.WithLabelValues("gc").Inc()
log.Error("do GC returns an error", err)
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err)
w.gcIsRunning = false
gcJobFailureCounter.WithLabelValues("gc").Inc()
w.done <- errors.Trace(err)
return
}
Expand All @@ -347,7 +351,7 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
return errors.Trace(err)
}

bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, goctx.Background())
bo := tikv.NewBackoffer(tikv.GcDeleteRangeMaxBackoff, ctx)
log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges))
startTime := time.Now()
regions := 0
Expand Down Expand Up @@ -431,7 +435,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident
Limit: gcScanLockLimit,
},
}
bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, goctx.Background())
bo := tikv.NewBackoffer(tikv.GcResolveLockMaxBackoff, ctx)

log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
Expand Down Expand Up @@ -498,7 +502,7 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident
}
} else {
log.Infof("[gc worker] %s, region %d has more than %d locks", identifier, loc.Region.GetID(), gcScanLockLimit)
gcRegionTooMuchLocksCounter.Inc()
gcRegionTooManyLocksCounter.Inc()
key = locks[len(locks)-1].Key
}
}
Expand All @@ -518,63 +522,96 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st
// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)

req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
SafePoint: safePoint,
},
}
bo := tikv.NewBackoffer(tikv.GcMaxBackoff, goctx.Background())

log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
regions := 0
successRegions := 0
failedRegions := 0

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
var key []byte
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
default:
}

loc, err := store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutLong)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}

var regionErr *errorpb.Error
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region)

// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to
// make the process correct.
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
if err == nil {
continue
}
continue
}
gcResp := resp.GC
if gcResp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}
if gcResp.GetError() != nil {
return errors.Errorf("unexpected gc error: %s", gcResp.GetError())

if err != nil {
failedRegions++
gcActionRegionResultCounter.WithLabelValues("fail").Inc()
log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey))
} else {
successRegions++
gcActionRegionResultCounter.WithLabelValues("success").Inc()
}
regions++

key = loc.EndKey
if len(key) == 0 {
break
}
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
}
log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime))
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
return nil
}

// these two errors should not return together, for more, see the func 'doGC'
func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
SafePoint: safePoint,
},
}

resp, err := store.SendReq(bo, req, region, tikv.GCTimeout)
if err != nil {
return nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
return regionErr, nil
}

gcResp := resp.GC
if gcResp == nil {
return nil, errors.Trace(tikv.ErrBodyMissing)
}
if gcResp.GetError() != nil {
return nil, errors.Errorf("unexpected gc error: %s", gcResp.GetError())
}

return nil, nil
}

func (w *GCWorker) checkLeader() (bool, error) {
gcWorkerCounter.WithLabelValues("check_leader").Inc()
session := createSession(w.store)
Expand Down
37 changes: 37 additions & 0 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@ package gcworker
import (
"flag"
"math"
"testing"
"time"

gofail "github.com/coreos/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/store/tikv"
goctx "golang.org/x/net/context"
)

var (
withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)")
pdAddrs = flag.String("pd-addrs", "127.0.0.1:2379", "pd addrs")
)

func TestT(t *testing.T) {
TestingT(t)
}

type testGCWorkerSuite struct {
store tikv.Storage
oracle *tikv.MockOracle
Expand Down Expand Up @@ -114,3 +122,32 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
c.Assert(err, IsNil)
s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second)
}

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
ctx := goctx.Background()
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
loc, err := s.store.GetRegionCache().LocateKey(bo, []byte(""))
c.Assert(err, IsNil)
var regionErr *errorpb.Error
regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region)
c.Assert(regionErr, IsNil)
c.Assert(err, IsNil)

gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("timeout")`)
regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region)
c.Assert(regionErr, IsNil)
c.Assert(err, NotNil)
gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult")

gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("GCNotLeader")`)
regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region)
c.Assert(regionErr.GetNotLeader(), NotNil)
c.Assert(err, IsNil)
gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult")

gofail.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("GCServerIsBusy")`)
regionErr, err = doGCForOneRegion(bo, s.store, 20, loc.Region)
c.Assert(regionErr.GetServerIsBusy(), NotNil)
c.Assert(err, IsNil)
gofail.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult")
}
17 changes: 13 additions & 4 deletions store/tikv/gcworker/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,23 @@ var (
Help: "Gauge of GC configs.",
}, []string{"type"})

gcFailureCounter = prometheus.NewCounterVec(
gcJobFailureCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "gc_failure",
Help: "Counter of gc job failure.",
}, []string{"type"})

gcRegionTooMuchLocksCounter = prometheus.NewCounter(
gcActionRegionResultCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "gc_action_result",
Help: "Counter of gc action result on region level.",
}, []string{"type"})

gcRegionTooManyLocksCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Expand All @@ -64,6 +72,7 @@ func init() {
prometheus.MustRegister(gcWorkerCounter)
prometheus.MustRegister(gcConfigGauge)
prometheus.MustRegister(gcHistogram)
prometheus.MustRegister(gcFailureCounter)
prometheus.MustRegister(gcRegionTooMuchLocksCounter)
prometheus.MustRegister(gcJobFailureCounter)
prometheus.MustRegister(gcActionRegionResultCounter)
prometheus.MustRegister(gcRegionTooManyLocksCounter)
}
17 changes: 17 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequ

// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {

// gofail: var tikvStoreSendReqResult string
// switch tikvStoreSendReqResult {
// case "timeout":
// return nil, errors.New("timeout")
// case "GCNotLeader":
// return &tikvrpc.Response{
// Type: tikvrpc.CmdGC,
// GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
// }, nil
// case "GCServerIsBusy":
// return &tikvrpc.Response{
// Type: tikvrpc.CmdGC,
// GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}},
// }, nil
// }

for {
ctx, err := s.regionCache.GetRPCContext(bo, regionID)
if err != nil {
Expand Down

0 comments on commit c6046b2

Please sign in to comment.