Skip to content

Commit

Permalink
store/tikv: add log for slow coprocessor task (pingcap#2524)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jan 21, 2017
1 parent 0d97442 commit dc2ec28
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
9 changes: 9 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type Backoffer struct {
totalSleep int
errors []error
ctx context.Context
types []backoffType
}

// NewBackoffer creates a Backoffer with maximum sleep time(in ms).
Expand Down Expand Up @@ -165,6 +166,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
}

b.totalSleep += f()
b.types = append(b.types, typ)

log.Debugf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep)
b.errors = append(b.errors, err)
Expand All @@ -181,6 +183,13 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error {
return nil
}

func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""
}
return fmt.Sprintf(" backoff(%dms %#s)", b.totalSleep, b.types)
}

// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors.
func (b *Backoffer) Fork() *Backoffer {
return &Backoffer{
Expand Down
21 changes: 17 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,15 @@ type copTask struct {
region RegionVerID
ranges *copRanges

status int
idx int // Index of task in the tasks slice.
respChan chan *coprocessor.Response
status int
idx int // Index of task in the tasks slice.
respChan chan *coprocessor.Response
storeAddr string
}

func (r *copTask) String() string {
return fmt.Sprintf("region(%d %d %d) ranges(%d) store(%s)",
r.region.id, r.region.confVer, r.region.ver, r.ranges.len(), r.storeAddr)
}

// copRanges is like []kv.KeyRange, but may has extra elements at head/tail.
Expand Down Expand Up @@ -301,6 +307,8 @@ type copIterator struct {
errChan chan error
}

const minLogCopTaskTime = 50 * time.Millisecond

// Pick the next new copTask and send request to tikv-server.
func (it *copIterator) work() {
for {
Expand All @@ -326,7 +334,11 @@ func (it *copIterator) work() {
bo := NewBackoffer(copNextMaxBackoff, context.Background())
startTime := time.Now()
resp, err := it.handleTask(bo, task)
coprocessorHistogram.Observe(time.Since(startTime).Seconds())
costTime := time.Since(startTime)
if costTime > minLogCopTaskTime {
log.Infof("[TIME_COP_TASK] %s%s %s", costTime, bo, task)
}
coprocessorHistogram.Observe(costTime.Seconds())
if bo.totalSleep > 0 {
backoffHistogram.Observe(float64(bo.totalSleep) / 1000)
}
Expand Down Expand Up @@ -464,6 +476,7 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask) (*coprocessor.Re
log.Warnf("coprocessor err: %v", err)
return nil, errors.Trace(err)
}
task.storeAddr = sender.storeAddr
return resp, nil
}
}
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type RegionRequestSender struct {
bo *Backoffer
regionCache *RegionCache
client Client
storeAddr string
}

// NewRegionRequestSender creates a new sender.
Expand Down Expand Up @@ -118,6 +119,7 @@ func (s *RegionRequestSender) SendCopReq(req *coprocessor.Request, regionID Regi
}, nil
}

s.storeAddr = ctx.Addr
resp, retry, err := s.sendCopReqToRegion(ctx, req, timeout)
if err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit dc2ec28

Please sign in to comment.