Skip to content

Commit

Permalink
Organize and enhance the code of RunawayChecker
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Aug 9, 2024
1 parent fd3ad81 commit ab40a04
Showing 1 changed file with 82 additions and 91 deletions.
173 changes: 82 additions & 91 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,6 @@ const (
// NullTime is a zero time.Time.
var NullTime time.Time

// RunawayMatchType is used to indicate whether query was interrupted by runaway identification or quarantine watch.
type RunawayMatchType uint

const (
// RunawayMatchTypeWatch shows quarantine watch.
RunawayMatchTypeWatch RunawayMatchType = iota
// RunawayMatchTypeIdentify shows identification.
RunawayMatchTypeIdentify
)

func (t RunawayMatchType) String() string {
switch t {
case RunawayMatchTypeWatch:
return "watch"
case RunawayMatchTypeIdentify:
return "identify"
default:
panic("unknown type")
}
}

// RunawayRecord is used to save records which will be insert into mysql.tidb_runaway_queries.
type RunawayRecord struct {
ResourceGroupName string
Expand Down Expand Up @@ -399,7 +378,7 @@ func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord {
return nil
}

func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest string, action string, matchType RunawayMatchType, now *time.Time) {
func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) {
source := rm.serverID
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
Expand All @@ -411,7 +390,7 @@ func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest
case rm.runawayQueriesChan <- &RunawayRecord{
ResourceGroupName: resourceGroupName,
Time: *now,
Match: matchType.String(),
Match: matchType,
Action: action,
SQLText: originalSQL,
PlanDigest: planDigest,
Expand Down Expand Up @@ -470,26 +449,31 @@ type RunawayChecker struct {
planDigest string

deadline time.Time
setting *rmpb.RunawaySettings
// From the group runaway settings, which will be applied when a query lacks a specified watch rule.
settings *rmpb.RunawaySettings

markedByRule atomic.Bool
// markedByRule is set to true when the query matches the group runaway settings.
markedByRule atomic.Bool
// markedByWatch is set to true when the query matches the specified watch rules.
markedByWatch bool
watchAction rmpb.RunawayAction
}

func newRunawayChecker(manager *RunawayManager, resourceGroupName string, setting *rmpb.RunawaySettings, originalSQL, sqlDigest, planDigest string, startTime time.Time) *RunawayChecker {
func newRunawayChecker(
manager *RunawayManager,
resourceGroupName string, settings *rmpb.RunawaySettings,
originalSQL, sqlDigest, planDigest string, startTime time.Time,
) *RunawayChecker {
c := &RunawayChecker{
manager: manager,
resourceGroupName: resourceGroupName,
originalSQL: originalSQL,
sqlDigest: sqlDigest,
planDigest: planDigest,
setting: setting,
markedByRule: atomic.Bool{},
markedByWatch: false,
settings: settings,
}
if setting != nil {
c.deadline = startTime.Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond)
if settings != nil {
c.deadline = startTime.Add(time.Duration(settings.Rule.ExecElapsedTimeMs) * time.Millisecond)
}
return c
}
Expand All @@ -499,50 +483,48 @@ func (r *RunawayChecker) BeforeExecutor() error {
if r == nil {
return nil
}
// Check if the query matches any specified watch rules.
for _, convict := range r.getConvictIdentifiers() {
watched, action := r.manager.examineWatchList(r.resourceGroupName, convict)
if watched {
if action == rmpb.RunawayAction_NoneAction && r.setting != nil {
action = r.setting.Action
}
r.markedByWatch = true
now := time.Now()
r.watchAction = action
r.markRunaway(RunawayMatchTypeWatch, action, &now)
// If no match action, it will do nothing.
switch action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayQuarantine
case rmpb.RunawayAction_CoolDown:
// This action should be done in BeforeCopRequest.
return nil
case rmpb.RunawayAction_DryRun:
return nil
default:
}
if !watched {
continue
}
// Use the group runaway settings if none are provided.
if action == rmpb.RunawayAction_NoneAction && r.settings != nil {
action = r.settings.Action
}
// Mark it if this is the first time being watched.
r.markRunawayByWatch(action)
// Take action if needed.
switch action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayQuarantine
case rmpb.RunawayAction_CoolDown:
// This action should be done in BeforeCopRequest.
return nil
default:
}
}
return nil
}

// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request.
func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
if r.setting == nil && !r.markedByWatch {
// If the group settings are not available and it's not marked by watch, skip this part.
if r.settings == nil && !r.markedByWatch {
return nil
}
marked := r.markedByRule.Load()
if !marked {
// note: now we don't check whether query is in watch list again.
if r.markedByWatch {
if r.watchAction == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
}

// If it's marked by watch and the action is cooldown, override the priority,
if r.markedByWatch && r.watchAction == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
// If group settings are available and the query is not marked by a rule,
// verify if it matches any rules in the settings.
if r.settings != nil && !r.markedByRule.Load() {
now := time.Now()
until := r.deadline.Sub(now)
if until > 0 {
if r.setting.Action == rmpb.RunawayAction_Kill {
if r.settings.Action == rmpb.RunawayAction_Kill {
// if the execution time is close to the threshold, set a timeout
if until < tikv.ReadTimeoutMedium {
req.Context.MaxExecutionDurationMs = uint64(until.Milliseconds())
Expand All @@ -551,39 +533,30 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// execution time exceeds the threshold, mark the query as runaway
if r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
if !r.markedByWatch {
r.markQuarantine(&now)
}
r.markRunawayByIdentify(r.settings.Action, &now)
// Take action if needed.
switch r.settings.Action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
case rmpb.RunawayAction_CoolDown:
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
return nil
default:
return nil
}
}
switch r.setting.Action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
case rmpb.RunawayAction_CoolDown:
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
return nil
case rmpb.RunawayAction_DryRun:
return nil
default:
return nil
}
return nil
}

// CheckCopRespError checks TiKV error after receiving coprocessor response.
func (r *RunawayChecker) CheckCopRespError(err error) error {
if err == nil || r.setting == nil || r.setting.Action != rmpb.RunawayAction_Kill {
if err == nil || r.settings == nil || r.settings.Action != rmpb.RunawayAction_Kill {
return err
}
if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") {
if !r.markedByRule.Load() {
now := time.Now()
if r.deadline.Before(now) && r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
if !r.markedByWatch {
r.markQuarantine(&now)
}
if r.deadline.Before(now) && r.markRunawayByIdentify(r.settings.Action, &now) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
}
}
Expand All @@ -596,25 +569,43 @@ func (r *RunawayChecker) CheckCopRespError(err error) error {
}

func (r *RunawayChecker) markQuarantine(now *time.Time) {
if r.setting.Watch == nil {
if r.settings == nil || r.settings.Watch == nil {
return
}
ttl := time.Duration(r.setting.Watch.LastingDurationMs) * time.Millisecond
ttl := time.Duration(r.settings.Watch.LastingDurationMs) * time.Millisecond

r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.settings.Watch.Type, r.settings.Action, ttl, now)
}

func (r *RunawayChecker) markRunawayByIdentify(action rmpb.RunawayAction, now *time.Time) bool {
swapped := r.markedByRule.CompareAndSwap(false, true)
if swapped {
r.markRunaway("identify", action, now)
if !r.markedByWatch {
r.markQuarantine(now)
}
}
return swapped
}

r.manager.markQuarantine(r.resourceGroupName, r.getSettingConvictIdentifier(), r.setting.Watch.Type, r.setting.Action, ttl, now)
func (r *RunawayChecker) markRunawayByWatch(action rmpb.RunawayAction) {
r.markedByWatch = true
r.watchAction = action
now := time.Now()
r.markRunaway("watch", action, &now)
}

func (r *RunawayChecker) markRunaway(matchType RunawayMatchType, action rmpb.RunawayAction, now *time.Time) {
actionStr := strings.ToLower(rmpb.RunawayAction_name[int32(action)])
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType.String(), actionStr).Inc()
func (r *RunawayChecker) markRunaway(matchType string, action rmpb.RunawayAction, now *time.Time) {
actionStr := strings.ToLower(action.String())
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc()
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now)
}

func (r *RunawayChecker) getSettingConvictIdentifier() string {
if r.setting.Watch == nil {
if r == nil || r.settings == nil || r.settings.Watch == nil {
return ""
}
switch r.setting.Watch.Type {
switch r.settings.Watch.Type {
case rmpb.RunawayWatchType_Plan:
return r.planDigest
case rmpb.RunawayWatchType_Similar:
Expand Down

0 comments on commit ab40a04

Please sign in to comment.