Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: record runaway and quarantine query #44654

Merged
merged 78 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
b38cbd3
introduce runaway manager
Connor1996 May 31, 2023
0c29816
refactor
Connor1996 Jun 1, 2023
1ed89d4
update bazel
Connor1996 Jun 1, 2023
ce9038c
runaway
CabinfeverB Jun 2, 2023
a12c48f
update bazel
CabinfeverB Jun 2, 2023
5da907c
draft for watch
CabinfeverB Jun 6, 2023
1d430aa
draft for watch
CabinfeverB Jun 6, 2023
e530127
fix draft
CabinfeverB Jun 6, 2023
a7751c7
fix draft
CabinfeverB Jun 6, 2023
17ad887
differ error
CabinfeverB Jun 7, 2023
0d2ce5d
fix watch
CabinfeverB Jun 7, 2023
e55f323
merge master
CabinfeverB Jun 7, 2023
7ce6412
fix build
Connor1996 Jun 7, 2023
dc2e0ee
rename
Connor1996 Jun 7, 2023
1ca9233
Merge remote-tracking branch 'upstream/master' into runaway
Connor1996 Jun 7, 2023
2b1dde6
fix test
Connor1996 Jun 7, 2023
0dba575
fix test and init default resource group name
Connor1996 Jun 7, 2023
5c8dadd
address comment
Connor1996 Jun 7, 2023
0ab9e11
fix resource group name
Connor1996 Jun 7, 2023
c3adcb0
set timeout
Connor1996 Jun 8, 2023
3c5f58b
add test
CabinfeverB Jun 8, 2023
8003741
add test
CabinfeverB Jun 8, 2023
ac060d3
fix test
Connor1996 Jun 8, 2023
bfec95f
update kvproto
Connor1996 Jun 8, 2023
c9b2844
fix lint
Connor1996 Jun 8, 2023
cc82ac9
add more test cases
Connor1996 Jun 8, 2023
ec05ebc
update bazel
Connor1996 Jun 8, 2023
fc8e927
refactor
CabinfeverB Jun 12, 2023
5df043f
avoid duplicated mark
Connor1996 Jun 12, 2023
d89aeb9
add mysql.quarantine_watch
CabinfeverB Jun 13, 2023
fdb251d
Merge remote-tracking branch 'upstream/master' into runaway
Connor1996 Jun 13, 2023
0924371
update go.sum
Connor1996 Jun 14, 2023
0968e41
fix lint
Connor1996 Jun 14, 2023
4ad2ee5
skip error
Connor1996 Jun 14, 2023
7def989
go mod tidy
Connor1996 Jun 14, 2023
2e52028
update errors.toml
Connor1996 Jun 14, 2023
faf5512
strict CI config check
Connor1996 Jun 14, 2023
c742dcb
fix nil runaway manager
Connor1996 Jun 14, 2023
8d2a159
add mysql.quarantine_watch
CabinfeverB Jun 14, 2023
9c724af
fix test
Connor1996 Jun 14, 2023
0bf01dd
impl watch for resource manager client
Connor1996 Jun 15, 2023
d8256be
update bazel
Connor1996 Jun 15, 2023
032c9ec
update nogo ignore
Connor1996 Jun 15, 2023
a29c211
merge master
CabinfeverB Jun 15, 2023
603a145
update
glorv Jun 15, 2023
4ac0a69
merge 44339
CabinfeverB Jun 15, 2023
3bfa010
merge 44339
CabinfeverB Jun 15, 2023
5194b7f
merge master
CabinfeverB Jun 15, 2023
5fe37ef
Merge remote-tracking branch 'upstream/master' into runaway
Connor1996 Jun 15, 2023
608ad12
update client-go to fix test
Connor1996 Jun 15, 2023
5929766
address comment
Connor1996 Jun 15, 2023
0f02a43
update bazel
Connor1996 Jun 15, 2023
9382ffe
avoid data race
Connor1996 Jun 15, 2023
4a6f874
clean stale test
Connor1996 Jun 15, 2023
c98bb44
impl resource manager get default resource group
CabinfeverB Jun 15, 2023
75fe376
merge 44339
CabinfeverB Jun 16, 2023
10f5a3c
merge mast
CabinfeverB Jun 16, 2023
d450ba1
merge master
CabinfeverB Jun 16, 2023
7c8b9ff
accelerated test
CabinfeverB Jun 16, 2023
8b8853c
address comment
CabinfeverB Jun 16, 2023
eab651a
add watch
glorv Jun 16, 2023
1f26e17
address comment
CabinfeverB Jun 16, 2023
2abbeb1
address comment
CabinfeverB Jun 16, 2023
ba2d649
fix
glorv Jun 16, 2023
17ab59f
errdoc
CabinfeverB Jun 16, 2023
6dfccfb
fix test
CabinfeverB Jun 16, 2023
ebec1cc
Merge branch 'master' of https://github.com/pingcap/tidb into pr-44654
glorv Jun 16, 2023
03b450d
address comment
CabinfeverB Jun 16, 2023
e7d1e99
merge master
CabinfeverB Jun 16, 2023
dacfe4b
fix tolower
CabinfeverB Jun 16, 2023
9f1ef9b
fix tolower
CabinfeverB Jun 16, 2023
1e99898
merge master
CabinfeverB Jun 16, 2023
80f9bd2
merge master
CabinfeverB Jun 16, 2023
8732117
address comment
CabinfeverB Jun 17, 2023
259cb1f
address comment
CabinfeverB Jun 17, 2023
0ce2208
fix test
CabinfeverB Jun 17, 2023
d77a972
fix test
CabinfeverB Jun 17, 2023
c3b9d97
fix test
CabinfeverB Jun 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 123 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/rand"
"net"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1219,6 +1220,7 @@ func (do *Domain) Init(
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop")
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
Expand Down Expand Up @@ -1246,6 +1248,121 @@ func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

func (do *Domain) runawayRecordFlushLoop() {
defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false)
// this times is used to batch flushing rocords, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
timer := time.NewTimer(time.Second)
fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
flushThrehold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold)
quarantineRecords := make([]*resourcegroup.QuarantineRecord, 0)

flushRunawayRecords := func() {
if len(records) == 0 {
return
}
sql, params := genRunawayQueriesStmt(records)
if err := do.execFlushSQL(sql, params); err != nil {
logutil.BgLogger().Info("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
}
records = records[:0]
}
flushQuarantineRecords := func() {
if len(quarantineRecords) == 0 {
return
}
sql, params := genQuarantineQueriesStmt(quarantineRecords)
if err := do.execFlushSQL(sql, params); err != nil {
logutil.BgLogger().Info("flush quarantine records failed", zap.Error(err), zap.Int("count", len(records)))
}
quarantineRecords = quarantineRecords[:0]
}

for {
select {
case <-do.exit:
return
case <-timer.C:
flushRunawayRecords()
fired = true
case r := <-quarantineRecordCh:
quarantineRecords = append(quarantineRecords, r)
// we expect quarantine record should not be triggered very often, so always
// flush as soon as possible.
if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold {
flushQuarantineRecords()
}
case r := <-recordCh:
records = append(records, r)
if len(records) >= flushThrehold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
glorv marked this conversation as resolved.
Show resolved Hide resolved
timer.Reset(time.Second)
}
}
}
}

func (do *Domain) execFlushSQL(sql string, params []interface{}) error {
se, err := do.sysSessionPool.Get()
defer func() {
do.sysSessionPool.Put(se)
}()
if err != nil {
return errors.Annotate(err, "get session failed")
}
exec := se.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err = exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
sql, params...,
)
return err
}

func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.Time)
params = append(params, r.Match)
params = append(params, r.Action)
params = append(params, r.SQLText)
params = append(params, r.PlanDigest)
params = append(params, r.From)
}
return builder.String(), params
}

func genQuarantineQueriesStmt(records []*resourcegroup.QuarantineRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_quarantined_watch VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
params = append(params, r.EndTime)
params = append(params, r.Watch)
params = append(params, r.WatchText)
params = append(params, r.From)
}
return builder.String(), params
}

func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client) error {
if pdClient == nil {
logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage")
Expand All @@ -1258,7 +1375,12 @@ func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.
return err
}
control.Start(ctx)
do.runawayManager = resourcegroup.NewRunawayManager(control)
serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
serverAddr := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port)))
do.runawayManager = resourcegroup.NewRunawayManager(control, serverAddr)
do.resourceGroupsController = control
return nil
}
Expand Down
153 changes: 135 additions & 18 deletions domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package resourcegroup

import (
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -32,22 +34,73 @@ const (
// DefaultResourceGroupName is the default resource group name.
DefaultResourceGroupName = "default"
// MaxWaitDuration is the max duration to wait for acquiring token buckets.
MaxWaitDuration = time.Second * 30
MaxWaitDuration = time.Second * 30
maxWatchListCap = 10000
maxWatchRecordChannelSize = 1024
)

// RunawayMatchType is used to indicates whether qurey was interrupted by runaway identification or quarantine watch.
type RunawayMatchType uint

const (
// RunawayMatchTypeWatch shows quarantine watch.
RunawayMatchTypeWatch RunawayMatchType = iota
// RunawayMatchTypeIdentify shows identification.
RunawayMatchTypeIdentify
)

func (t RunawayMatchType) String() string {
switch t {
case RunawayMatchTypeWatch:
return "watch"
case RunawayMatchTypeIdentify:
return "identify"
default:
panic("unknown type")
}
}

// RunawayRecord is used to save records which will be insert into mysql.tidb_runaway_queries.
type RunawayRecord struct {
ResourceGroupName string
Time time.Time
Match string
Action string
SQLText string
PlanDigest string
From string
}

// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_quarantined_watch.
type QuarantineRecord struct {
ResourceGroupName string
StartTime time.Time
EndTime time.Time
Watch string
WatchText string
From string
}

// RunawayManager is used to detect and record runaway queries.
type RunawayManager struct {
resourceGroupCtl *rmclient.ResourceGroupsController
watchList *ttlcache.Cache[string, struct{}]
queryLock sync.Mutex
resourceGroupCtl *rmclient.ResourceGroupsController
watchList *ttlcache.Cache[string, struct{}]
serverID string
runawayQueriesChan chan *RunawayRecord
quarantineChan chan *QuarantineRecord
}

// NewRunawayManager creates a new RunawayManager.
func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController) *RunawayManager {
watchList := ttlcache.New[string, struct{}]()
func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serverAddr string) *RunawayManager {
watchList := ttlcache.New[string, struct{}](ttlcache.WithCapacity[string, struct{}](maxWatchListCap))
go watchList.Start()
return &RunawayManager{
resourceGroupCtl: resourceGroupCtl,
watchList: watchList,
resourceGroupCtl: resourceGroupCtl,
watchList: watchList,
serverID: serverAddr,
runawayQueriesChan: make(chan *RunawayRecord, maxWatchRecordChannelSize),
quarantineChan: make(chan *QuarantineRecord, maxWatchRecordChannelSize),
}
}

Expand All @@ -61,17 +114,64 @@ func (rm *RunawayManager) DeriveChecker(resourceGroupName string, originalSQL st
if group.RunawaySettings == nil {
return nil
}

return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, planDigest)
}

// MarkRunaway marks the query as runaway.
func (rm *RunawayManager) MarkRunaway(resourceGroupName string, convict string, ttl time.Duration) {
rm.watchList.Set(resourceGroupName+"/"+convict, struct{}{}, ttl)
func (rm *RunawayManager) markQuarantine(resourceGroupName, convict, watchType string, ttl time.Duration, action string, now *time.Time) {
key := resourceGroupName + "/" + convict
if rm.watchList.Get(key) == nil {
rm.queryLock.Lock()
if rm.watchList.Get(key) == nil {
rm.watchList.Set(key, struct{}{}, ttl)
}
rm.queryLock.Unlock()
}
select {
case rm.quarantineChan <- &QuarantineRecord{
ResourceGroupName: resourceGroupName,
StartTime: *now,
EndTime: now.Add(ttl),
Watch: watchType,
WatchText: convict,
From: rm.serverID,
}:
default:
// TODO: add warning for discard flush records
}
}
func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest string, action string, matchType RunawayMatchType, now *time.Time) {
select {
case rm.runawayQueriesChan <- &RunawayRecord{
ResourceGroupName: resourceGroupName,
Time: *now,
Match: matchType.String(),
Action: action,
SQLText: originalSQL,
PlanDigest: planDigest,
From: rm.serverID,
}:
default:
// TODO: add warning for discard flush records
}
}

// FlushThreshold specifies the threshold for the number of records in trigger flush
func (rm *RunawayManager) FlushThreshold() int {
return maxWatchRecordChannelSize / 2
}

// ExamineWatchList check whether the query is in watch list.
func (rm *RunawayManager) ExamineWatchList(resourceGroupName string, convict string) bool {
// RunawayRecordChan returns the channel of RunawayRecord
func (rm *RunawayManager) RunawayRecordChan() <-chan *RunawayRecord {
return rm.runawayQueriesChan
}

// QuarantineRecordChan returns the channel of QuarantineRecord
func (rm *RunawayManager) QuarantineRecordChan() <-chan *QuarantineRecord {
return rm.quarantineChan
}

// examineWatchList check whether the query is in watch list.
func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict string) bool {
return rm.watchList.Get(resourceGroupName+"/"+convict) != nil
}

Expand All @@ -91,6 +191,7 @@ type RunawayChecker struct {

deadline time.Time
setting *rmpb.RunawaySettings
action string

marked atomic.Bool
}
Expand All @@ -104,6 +205,7 @@ func newRunawayChecker(manager *RunawayManager, resourceGroupName string, settin
deadline: time.Now().Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond),
setting: setting,
marked: atomic.Bool{},
action: strings.ToLower(setting.Action.String()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not get it from setting when needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reduce the use of strings.ToLower.

}
}

Expand All @@ -112,9 +214,13 @@ func (r *RunawayChecker) BeforeExecutor() error {
if r == nil {
return nil
}
result := r.manager.ExamineWatchList(r.resourceGroupName, r.getConvictIdentifier())
result := r.manager.examineWatchList(r.resourceGroupName, r.getConvictIdentifier())
if result {
r.marked.Store(result)
if result {
now := time.Now()
r.markRunaway(RunawayMatchTypeWatch, &now)
}
switch r.setting.Action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayQuarantine
Expand Down Expand Up @@ -144,7 +250,9 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
}
// execution time exceeds the threshold, mark the query as runaway
if r.marked.CompareAndSwap(false, true) {
r.markRunaway()
now := time.Now()
r.markRunaway(RunawayMatchTypeIdentify, &now)
r.markQuarantine(&now)
}
}
switch r.setting.Action {
Expand All @@ -166,16 +274,25 @@ func (r *RunawayChecker) AfterCopRequest() {
// Here only marks the query as runaway
if !r.marked.Load() && r.deadline.Before(time.Now()) {
if r.marked.CompareAndSwap(false, true) {
r.markRunaway()
now := time.Now()
r.markRunaway(RunawayMatchTypeIdentify, &now)
r.markQuarantine(&now)
}
}
}

func (r *RunawayChecker) markRunaway() {
func (r *RunawayChecker) markQuarantine(now *time.Time) {
if r.setting.Watch == nil {
return
}
r.manager.MarkRunaway(r.resourceGroupName, r.getConvictIdentifier(), time.Duration(r.setting.Watch.LastingDurationMs)*time.Millisecond)
watchType := strings.ToLower(r.setting.Watch.Type.String())
ttl := time.Duration(r.setting.Watch.LastingDurationMs) * time.Millisecond

r.manager.markQuarantine(r.resourceGroupName, r.getConvictIdentifier(), watchType, ttl, r.action, now)
}

func (r *RunawayChecker) markRunaway(matchType RunawayMatchType, now *time.Time) {
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, r.action, matchType, now)
}

func (r *RunawayChecker) getConvictIdentifier() string {
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 49
result := 51
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
Loading