Skip to content

Commit

Permalink
cherry pick pingcap#17568 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
hunterlxt authored and sre-bot committed Jun 2, 2020
1 parent 23b94af commit d8f5df1
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
respCh = task.respChan
}

bo := NewBackoffer(ctx, copNextMaxBackoff).WithVars(worker.vars)
worker.handleTask(bo, task, respCh)
worker.handleTask(ctx, task, respCh)
close(task.respChan)
select {
case <-worker.finishCh:
Expand Down Expand Up @@ -654,8 +653,20 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
bo, ok := backoffermap[task.region.id]
if ok {
return bo
}
newbo := NewBackoffer(ctx, copNextMaxBackoff).WithVars(worker.vars)
backoffermap[task.region.id] = newbo
return newbo
}

// handleTask handles single copTask, sends the result to channel, retry automatically on error.
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask, respCh chan<- *copResponse) {
defer func() {
r := recover()
if r != nil {
Expand All @@ -668,8 +679,11 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh
}
}()
remainTasks := []*copTask{task}
backoffermap := make(map[uint64]*Backoffer)
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
curTask := remainTasks[0]
bo := chooseBackoffer(ctx, backoffermap, curTask, worker)
tasks, err := worker.handleTaskOnce(bo, curTask, respCh)
if err != nil {
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh, true)
Expand Down

0 comments on commit d8f5df1

Please sign in to comment.