Skip to content

Commit

Permalink
store/tikv: Remove gcTaskWorker (#11033)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored Jul 4, 2019
1 parent 9eb2379 commit 0f1822a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 171 deletions.
22 changes: 11 additions & 11 deletions store/tikv/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (t *DeleteRangeTask) Execute(ctx context.Context) error {
}

// Execute performs the delete range operation.
func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (int, error) {
func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (RangeTaskStat, error) {
startKey, rangeEndKey := r.StartKey, r.EndKey
completedRegions := 0
var stat RangeTaskStat
for {
select {
case <-ctx.Done():
return completedRegions, errors.Trace(ctx.Err())
return stat, errors.Trace(ctx.Err())
default:
}

Expand All @@ -94,7 +94,7 @@ func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (in
bo := NewBackoffer(ctx, deleteRangeOneRegionMaxBackoff)
loc, err := t.store.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return completedRegions, errors.Trace(err)
return stat, errors.Trace(err)
}

// Delete to the end of the region, except if it's the last region overlapping the range
Expand All @@ -115,31 +115,31 @@ func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (in

resp, err := t.store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return completedRegions, errors.Trace(err)
return stat, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return completedRegions, errors.Trace(err)
return stat, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return completedRegions, errors.Trace(err)
return stat, errors.Trace(err)
}
continue
}
deleteRangeResp := resp.DeleteRange
if deleteRangeResp == nil {
return completedRegions, errors.Trace(ErrBodyMissing)
return stat, errors.Trace(ErrBodyMissing)
}
if err := deleteRangeResp.GetError(); err != "" {
return completedRegions, errors.Errorf("unexpected delete range err: %v", err)
return stat, errors.Errorf("unexpected delete range err: %v", err)
}
completedRegions++
stat.CompletedRegions++
startKey = endKey
}

return completedRegions, nil
return stat, nil
}

// CompletedRegions returns the number of regions that are affected by this delete range task
Expand Down
188 changes: 51 additions & 137 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -100,7 +99,6 @@ const (
booleanFalse = "false"

gcWorkerTickInterval = time.Minute
gcJobLogTickInterval = time.Minute * 10
gcWorkerLease = time.Minute * 2
gcLeaderUUIDKey = "tikv_gc_leader_uuid"
gcLeaderDescKey = "tikv_gc_leader_desc"
Expand Down Expand Up @@ -796,7 +794,7 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren
zap.Int("concurrency", concurrency))
startTime := time.Now()

handler := func(ctx context.Context, r kv.KeyRange) (int, error) {
handler := func(ctx context.Context, r kv.KeyRange) (tikv.RangeTaskStat, error) {
return w.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
}

Expand All @@ -814,12 +812,12 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren
logutil.Logger(ctx).Info("[gc worker] finish resolve locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int32("regions", runner.CompletedRegions()))
zap.Int("regions", runner.CompletedRegions()))
metrics.GCHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds())
return nil
}

func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (int, error) {
func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (tikv.RangeTaskStat, error) {
// for scan lock request, we must return all locks even if they are generated
// by the same transaction. because gc worker need to make sure all locks have been
// cleaned.
Expand All @@ -831,12 +829,12 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s
},
}

regions := 0
var stat tikv.RangeTaskStat
key := startKey
for {
select {
case <-ctx.Done():
return regions, errors.New("[gc worker] gc job canceled")
return stat, errors.New("[gc worker] gc job canceled")
default:
}

Expand All @@ -845,29 +843,29 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s
req.ScanLock.StartKey = key
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return regions, errors.Trace(err)
return stat, errors.Trace(err)
}
resp, err := w.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium)
if err != nil {
return regions, errors.Trace(err)
return stat, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return regions, errors.Trace(err)
return stat, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return regions, errors.Trace(err)
return stat, errors.Trace(err)
}
continue
}
locksResp := resp.ScanLock
if locksResp == nil {
return regions, errors.Trace(tikv.ErrBodyMissing)
return stat, errors.Trace(tikv.ErrBodyMissing)
}
if locksResp.GetError() != nil {
return regions, errors.Errorf("unexpected scanlock error: %s", locksResp)
return stat, errors.Errorf("unexpected scanlock error: %s", locksResp)
}
locksInfo := locksResp.GetLocks()
locks := make([]*tikv.Lock, len(locksInfo))
Expand All @@ -877,18 +875,18 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s

ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
if err1 != nil {
return regions, errors.Trace(err1)
return stat, errors.Trace(err1)
}
if !ok {
err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks)))
if err != nil {
return regions, errors.Trace(err)
return stat, errors.Trace(err)
}
continue
}

if len(locks) < gcScanLockLimit {
regions++
stat.CompletedRegions++
key = loc.EndKey
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
Expand All @@ -903,7 +901,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s
break
}
}
return regions, nil
return stat, nil
}

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
Expand Down Expand Up @@ -939,62 +937,18 @@ func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) er
return nil
}

type gcTask struct {
startKey []byte
endKey []byte
safePoint uint64
}

type gcTaskWorker struct {
identifier string
store tikv.Storage
taskCh chan *gcTask
wg *sync.WaitGroup
// successRegions and failedRegions use atomic to read and set.
successRegions *int32
failedRegions *int32
}

func newGCTaskWorker(store tikv.Storage, taskCh chan *gcTask, wg *sync.WaitGroup, identifer string, successRegions *int32, failedRegions *int32) *gcTaskWorker {
return &gcTaskWorker{
identifer,
store,
taskCh,
wg,
successRegions,
failedRegions,
}
}

func (w *gcTaskWorker) run() {
defer w.wg.Done()
for task := range w.taskCh {
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint)
if err != nil {
logutil.BgLogger().Error("[gc worker] gc interrupted because get region error",
zap.String("uuid", w.identifier),
zap.Binary("startKey", task.startKey),
zap.Binary("endKey", task.endKey),
zap.Error(err))
}
}
}

func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint uint64) error {
var successRegions int32
var failedRegions int32
func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []byte, safePoint uint64) (tikv.RangeTaskStat, error) {
var stat tikv.RangeTaskStat
defer func() {
atomic.AddInt32(w.successRegions, successRegions)
atomic.AddInt32(w.failedRegions, failedRegions)
metrics.GCActionRegionResultCounter.WithLabelValues("success").Add(float64(successRegions))
metrics.GCActionRegionResultCounter.WithLabelValues("fail").Add(float64(failedRegions))
metrics.GCActionRegionResultCounter.WithLabelValues("success").Add(float64(stat.CompletedRegions))
metrics.GCActionRegionResultCounter.WithLabelValues("fail").Add(float64(stat.FailedRegions))
}()
key := startKey
for {
bo := tikv.NewBackoffer(context.Background(), tikv.GcOneRegionMaxBackoff)
bo := tikv.NewBackoffer(ctx, tikv.GcOneRegionMaxBackoff)
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
return stat, errors.Trace(err)
}

var regionErr *errorpb.Error
Expand All @@ -1011,13 +965,13 @@ func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint ui

if err != nil {
logutil.BgLogger().Warn("[gc worker]",
zap.String("uuid", w.identifier),
zap.String("uuid", w.uuid),
zap.String("gc for range", fmt.Sprintf("[%d, %d)", startKey, endKey)),
zap.Uint64("safePoint", safePoint),
zap.Error(err))
failedRegions++
stat.FailedRegions++
} else {
successRegions++
stat.CompletedRegions++
}

key = loc.EndKey
Expand All @@ -1026,12 +980,12 @@ func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint ui
}
}

return nil
return stat, nil
}

// doGCForRegion used for gc for region.
// these two errors should not return together, for more, see the func 'doGC'
func (w *gcTaskWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
func (w *GCWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
Expand Down Expand Up @@ -1062,83 +1016,43 @@ func (w *gcTaskWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, regio
return nil, nil
}

func (w *GCWorker) genNextGCTask(bo *tikv.Backoffer, safePoint uint64, key kv.Key) (*gcTask, error) {
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return nil, errors.Trace(err)
}

task := &gcTask{
startKey: key,
endKey: loc.EndKey,
safePoint: safePoint,
}
return task, nil
}

func (w *GCWorker) doGC(ctx context.Context, safePoint uint64, concurrency int) error {
metrics.GCWorkerCounter.WithLabelValues("do_gc").Inc()
logutil.Logger(ctx).Info("[gc worker]",
logutil.Logger(ctx).Info("[gc worker] start doing gc for all keys",
zap.String("uuid", w.uuid),
zap.Int("concurrency", concurrency),
zap.Uint64("safePoint", safePoint))
startTime := time.Now()
var successRegions int32
var failedRegions int32

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

// Create task queue and start task workers.
gcTaskCh := make(chan *gcTask, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
w := newGCTaskWorker(w.store, gcTaskCh, &wg, w.uuid, &successRegions, &failedRegions)
wg.Add(1)
go w.run()
}
runner := tikv.NewRangeTaskRunner(
"gc-runner",
w.store,
concurrency,
func(ctx context.Context, r kv.KeyRange) (tikv.RangeTaskStat, error) {
return w.doGCForRange(ctx, r.StartKey, r.EndKey, safePoint)
})

var key []byte
defer func() {
close(gcTaskCh)
wg.Wait()
logutil.Logger(ctx).Info("[gc worker]",
err := runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
logutil.Logger(ctx).Warn("[gc worker] failed to do gc for all keys",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int32("successful regions", atomic.LoadInt32(&successRegions)),
zap.Int32("failed regions", atomic.LoadInt32(&failedRegions)),
zap.Duration("total cost time", time.Since(startTime)))
metrics.GCHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
}()
zap.Int("concurrency", concurrency),
zap.Error(err))
return errors.Trace(err)
}

for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
logutil.Logger(ctx).Info("[gc worker]",
zap.String("gc in process", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int32("successful regions", atomic.LoadInt32(&successRegions)),
zap.Int32("failed regions", atomic.LoadInt32(&failedRegions)),
zap.Duration("total cost time", time.Since(startTime)))
default:
}
successRegions := runner.CompletedRegions()
failedRegions := runner.FailedRegions()

bo := tikv.NewBackoffer(ctx, tikv.GcOneRegionMaxBackoff)
task, err := w.genNextGCTask(bo, safePoint, key)
if err != nil {
return errors.Trace(err)
}
if task != nil {
gcTaskCh <- task
key = task.endKey
}
logutil.Logger(ctx).Info("[gc worker] finished doing gc for all keys",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int("successful regions", successRegions),
zap.Int("failed regions", failedRegions),
zap.Duration("total cost time", time.Since(startTime)))
metrics.GCHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())

if len(key) == 0 {
return nil
}
}
return nil
}

func (w *GCWorker) checkLeader() (bool, error) {
Expand Down
Loading

0 comments on commit 0f1822a

Please sign in to comment.