Skip to content

Commit

Permalink
*: global runaway watch by system table and impl exector for `query w…
Browse files Browse the repository at this point in the history
…atch` (#45465)

ref #43691
  • Loading branch information
CabinfeverB authored Aug 1, 2023
1 parent 2e8de8d commit c46da07
Show file tree
Hide file tree
Showing 23 changed files with 1,304 additions and 376 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5741,13 +5741,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "f6f930eb916b333cf7a615ce1f30afccc72e15bc86a08336061d190fb02bfb63",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230724163613-ee4a4ff68ac3",
sha256 = "1ddca1fa78d880aebb15ecfd7028a1c0d7d717b4e3a59200aa13d8623aa867b0",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230728080053-8a9db88bc88a",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
],
)
go_repository(
Expand Down
3 changes: 1 addition & 2 deletions ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
if options.Runaway.Action == model.RunawayActionNone {
return nil, ErrUnknownResourceGroupRunawayAction
}
// because RunawayActionNone is only defined in tidb, sub 1.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action - 1)
runaway.Action = rmpb.RunawayAction(options.Runaway.Action)
if options.Runaway.WatchType != model.WatchNone {
runaway.Watch = &rmpb.RunawayWatch{}
runaway.Watch.Type = rmpb.RunawayWatchType(options.Runaway.WatchType)
Expand Down
6 changes: 3 additions & 3 deletions ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,17 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t identify",
"rg2 select /*+ resource_group(rg2) */ * from t watch"), maxWaitDuration, tryInterval)
tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch").
tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch").
Check(testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t"))

tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_quarantined_watch", nil,
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_watch", nil,
nil, maxWaitDuration, tryInterval)
err = tk.QueryToErr("select /*+ resource_group(rg3) */ * from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
tk.MustGetErrCode("select /*+ resource_group(rg3) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch", nil,
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil,
testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t"), maxWaitDuration, tryInterval)

tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=COOLDOWN)")
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikvrpc"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
"runaway.go",
"schema_checker.go",
"schema_validator.go",
"sysvar_cache.go",
Expand Down Expand Up @@ -84,6 +85,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
255 changes: 2 additions & 253 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"math/rand"
"net"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -66,12 +65,9 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/sqlbuilder"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -165,6 +161,7 @@ type Domain struct {
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager atomic.Pointer[ttlworker.JobManager]
runawayManager *resourcegroup.RunawayManager
runawaySyncer *runawaySyncer
resourceGroupsController *rmclient.ResourceGroupsController

serverID uint64
Expand Down Expand Up @@ -1243,6 +1240,7 @@ func (do *Domain) Init(
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop")
do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop")
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
Expand Down Expand Up @@ -1270,255 +1268,6 @@ func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

const (
runawayRecordFluashInterval = time.Second
quarantineRecordGCInterval = time.Minute * 10
runawayRecordGCInterval = time.Hour * 24
runawayRecordExpiredDuration = time.Hour * 24 * 7

runawayRecordGCBatchSize = 100
runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5
)

var systemSchemaCIStr = model.NewCIStr("mysql")

func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration time.Duration) {
if !do.DDL().OwnerManager().IsOwner() {
return
}
failpoint.Inject("FastRunawayGC", func() {
expiredDuration = time.Second * 1
})
expiredTime := time.Now().Add(-expiredDuration)
tbCIStr := model.NewCIStr(tableName)
tbl, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
tbInfo := tbl.Meta()
col := tbInfo.FindPublicColumnByName(colName)
if col == nil {
logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName))
return
}
tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
var leftRows [][]types.Datum
for {
sql := ""
if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
// to remove
if len(sql) == 0 {
return
}

rows, sqlErr := do.execRestrictedSQL(sql, nil)
if sqlErr != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
leftRows = make([][]types.Datum, len(rows))
for i, row := range rows {
leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes)
}

for len(leftRows) > 0 {
var delBatch [][]types.Datum
if len(leftRows) < runawayRecordGCBatchSize {
delBatch = leftRows
leftRows = nil
} else {
delBatch = leftRows[0:runawayRecordGCBatchSize]
leftRows = leftRows[runawayRecordGCBatchSize:]
}
sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime)
if err != nil {
logutil.BgLogger().Error(
"build delete SQL failed when deleting system table",
zap.Error(err),
zap.String("table", tb.Schema.O+"."+tb.Name.O),
)
return
}

_, err = do.execRestrictedSQL(sql, nil)
if err != nil {
logutil.BgLogger().Error(
"delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql),
)
}
}
}
}

func (do *Domain) runawayRecordFlushLoop() {
defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false)

// this times is used to batch flushing rocords, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
runawayRecordFluashTimer := time.NewTimer(runawayRecordFluashInterval)
runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval)
quarantineRecordGCTicker := time.NewTicker(quarantineRecordGCInterval)
failpoint.Inject("FastRunawayGC", func() {
runawayRecordFluashTimer.Stop()
runawayRecordGCTicker.Stop()
quarantineRecordGCTicker.Stop()
runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200)
quarantineRecordGCTicker = time.NewTicker(time.Millisecond * 200)
})

fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
flushThrehold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold)
quarantineRecords := make([]*resourcegroup.QuarantineRecord, 0)

flushRunawayRecords := func() {
if len(records) == 0 {
return
}
sql, params := genRunawayQueriesStmt(records)
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
}
records = records[:0]
}
flushQuarantineRecords := func() {
if len(quarantineRecords) == 0 {
return
}
sql, params := genQuarantineQueriesStmt(quarantineRecords)
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords)))
}
quarantineRecords = quarantineRecords[:0]
}
for {
select {
case <-do.exit:
return
case <-runawayRecordFluashTimer.C:
flushRunawayRecords()
fired = true
case r := <-quarantineRecordCh:
quarantineRecords = append(quarantineRecords, r)
// we expect quarantine record should not be triggered very often, so always
// flush as soon as possible.
if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold {
flushQuarantineRecords()
}
case r := <-recordCh:
records = append(records, r)
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThrehold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
runawayRecordFluashTimer.Reset(runawayRecordFluashInterval)
}
case <-runawayRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration)
case <-quarantineRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_quarantined_watch", "end_time", time.Duration(0))
}
}
}

func (do *Domain) execRestrictedSQL(sql string, params []interface{}) ([]chunk.Row, error) {
se, err := do.sysSessionPool.Get()
defer func() {
do.sysSessionPool.Put(se)
}()
if err != nil {
return nil, errors.Annotate(err, "get session failed")
}
exec := se.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
sql, params...,
)
return r, err
}

func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.Time)
params = append(params, r.Match)
params = append(params, r.Action)
params = append(params, r.SQLText)
params = append(params, r.PlanDigest)
params = append(params, r.From)
}
return builder.String(), params
}

func genQuarantineQueriesStmt(records []*resourcegroup.QuarantineRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_quarantined_watch VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
params = append(params, r.EndTime)
params = append(params, r.Watch)
params = append(params, r.WatchText)
params = append(params, r.Source)
}
return builder.String(), params
}

func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client) error {
if pdClient == nil {
logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage")
// return nil as unistore doesn't support it
return nil
}

control, err := rmclient.NewResourceGroupController(ctx, do.ServerID(), pdClient, nil, rmclient.WithMaxWaitDuration(resourcegroup.MaxWaitDuration))
if err != nil {
return err
}
control.Start(ctx)
serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
serverAddr := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port)))
do.runawayManager = resourcegroup.NewRunawayManager(control, serverAddr)
do.resourceGroupsController = control
tikv.SetResourceControlInterceptor(control)
return nil
}

func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if pdClient == nil || do.etcdClient == nil {
Expand Down
Loading

0 comments on commit c46da07

Please sign in to comment.