Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: global runaway watch by system table and impl exector for query watch #45465

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5741,13 +5741,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "f6f930eb916b333cf7a615ce1f30afccc72e15bc86a08336061d190fb02bfb63",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230724163613-ee4a4ff68ac3",
sha256 = "1ddca1fa78d880aebb15ecfd7028a1c0d7d717b4e3a59200aa13d8623aa867b0",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230728080053-8a9db88bc88a",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230724163613-ee4a4ff68ac3.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230728080053-8a9db88bc88a.zip",
],
)
go_repository(
Expand Down
3 changes: 1 addition & 2 deletions ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
if options.Runaway.Action == model.RunawayActionNone {
return nil, ErrUnknownResourceGroupRunawayAction
}
// because RunawayActionNone is only defined in tidb, sub 1.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action - 1)
runaway.Action = rmpb.RunawayAction(options.Runaway.Action)
if options.Runaway.WatchType != model.WatchNone {
runaway.Watch = &rmpb.RunawayWatch{}
runaway.Watch.Type = rmpb.RunawayWatchType(options.Runaway.WatchType)
Expand Down
6 changes: 3 additions & 3 deletions ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,17 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t identify",
"rg2 select /*+ resource_group(rg2) */ * from t watch"), maxWaitDuration, tryInterval)
tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch").
tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch").
Check(testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t"))

tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_quarantined_watch", nil,
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_watch", nil,
nil, maxWaitDuration, tryInterval)
err = tk.QueryToErr("select /*+ resource_group(rg3) */ * from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
tk.MustGetErrCode("select /*+ resource_group(rg3) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_quarantined_watch", nil,
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil,
testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t"), maxWaitDuration, tryInterval)

tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=COOLDOWN)")
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikvrpc"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
"runaway.go",
"schema_checker.go",
"schema_validator.go",
"sysvar_cache.go",
Expand Down Expand Up @@ -84,6 +85,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
255 changes: 2 additions & 253 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"math/rand"
"net"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -66,12 +65,9 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/sqlbuilder"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -165,6 +161,7 @@ type Domain struct {
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager atomic.Pointer[ttlworker.JobManager]
runawayManager *resourcegroup.RunawayManager
runawaySyncer *runawaySyncer
resourceGroupsController *rmclient.ResourceGroupsController

serverID uint64
Expand Down Expand Up @@ -1242,6 +1239,7 @@ func (do *Domain) Init(
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop")
do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop")
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
Expand Down Expand Up @@ -1269,255 +1267,6 @@ func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

const (
runawayRecordFluashInterval = time.Second
quarantineRecordGCInterval = time.Minute * 10
runawayRecordGCInterval = time.Hour * 24
runawayRecordExpiredDuration = time.Hour * 24 * 7

runawayRecordGCBatchSize = 100
runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5
)

var systemSchemaCIStr = model.NewCIStr("mysql")

func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration time.Duration) {
if !do.DDL().OwnerManager().IsOwner() {
return
}
failpoint.Inject("FastRunawayGC", func() {
expiredDuration = time.Second * 1
})
expiredTime := time.Now().Add(-expiredDuration)
tbCIStr := model.NewCIStr(tableName)
tbl, err := do.InfoSchema().TableByName(systemSchemaCIStr, tbCIStr)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
tbInfo := tbl.Meta()
col := tbInfo.FindPublicColumnByName(colName)
if col == nil {
logutil.BgLogger().Error("time column is not public in table", zap.String("table", tableName), zap.String("column", colName))
return
}
tb, err := cache.NewBasePhysicalTable(systemSchemaCIStr, tbInfo, model.NewCIStr(""), col)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
generator, err := sqlbuilder.NewScanQueryGenerator(tb, expiredTime, nil, nil)
if err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
var leftRows [][]types.Datum
for {
sql := ""
if sql, err = generator.NextSQL(leftRows, runawayRecordGCSelectBatchSize); err != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
// to remove
if len(sql) == 0 {
return
}

rows, sqlErr := do.execRestrictedSQL(sql, nil)
if sqlErr != nil {
logutil.BgLogger().Error("delete system table failed", zap.String("table", tableName), zap.Error(err))
return
}
leftRows = make([][]types.Datum, len(rows))
for i, row := range rows {
leftRows[i] = row.GetDatumRow(tb.KeyColumnTypes)
}

for len(leftRows) > 0 {
var delBatch [][]types.Datum
if len(leftRows) < runawayRecordGCBatchSize {
delBatch = leftRows
leftRows = nil
} else {
delBatch = leftRows[0:runawayRecordGCBatchSize]
leftRows = leftRows[runawayRecordGCBatchSize:]
}
sql, err := sqlbuilder.BuildDeleteSQL(tb, delBatch, expiredTime)
if err != nil {
logutil.BgLogger().Error(
"build delete SQL failed when deleting system table",
zap.Error(err),
zap.String("table", tb.Schema.O+"."+tb.Name.O),
)
return
}

_, err = do.execRestrictedSQL(sql, nil)
if err != nil {
logutil.BgLogger().Error(
"delete SQL failed when deleting system table", zap.Error(err), zap.String("SQL", sql),
)
}
}
}
}

func (do *Domain) runawayRecordFlushLoop() {
defer util.Recover(metrics.LabelDomain, "runawayRecordFlushLoop", nil, false)

// this times is used to batch flushing rocords, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
runawayRecordFluashTimer := time.NewTimer(runawayRecordFluashInterval)
runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval)
quarantineRecordGCTicker := time.NewTicker(quarantineRecordGCInterval)
failpoint.Inject("FastRunawayGC", func() {
runawayRecordFluashTimer.Stop()
runawayRecordGCTicker.Stop()
quarantineRecordGCTicker.Stop()
runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200)
quarantineRecordGCTicker = time.NewTicker(time.Millisecond * 200)
})

fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
flushThrehold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold)
quarantineRecords := make([]*resourcegroup.QuarantineRecord, 0)

flushRunawayRecords := func() {
if len(records) == 0 {
return
}
sql, params := genRunawayQueriesStmt(records)
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
}
records = records[:0]
}
flushQuarantineRecords := func() {
if len(quarantineRecords) == 0 {
return
}
sql, params := genQuarantineQueriesStmt(quarantineRecords)
if _, err := do.execRestrictedSQL(sql, params); err != nil {
logutil.BgLogger().Error("flush quarantine records failed", zap.Error(err), zap.Int("count", len(quarantineRecords)))
}
quarantineRecords = quarantineRecords[:0]
}
for {
select {
case <-do.exit:
return
case <-runawayRecordFluashTimer.C:
flushRunawayRecords()
fired = true
case r := <-quarantineRecordCh:
quarantineRecords = append(quarantineRecords, r)
// we expect quarantine record should not be triggered very often, so always
// flush as soon as possible.
if len(quarantineRecordCh) == 0 || len(quarantineRecords) >= flushThrehold {
flushQuarantineRecords()
}
case r := <-recordCh:
records = append(records, r)
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThrehold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
runawayRecordFluashTimer.Reset(runawayRecordFluashInterval)
}
case <-runawayRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration)
case <-quarantineRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_quarantined_watch", "end_time", time.Duration(0))
}
}
}

func (do *Domain) execRestrictedSQL(sql string, params []interface{}) ([]chunk.Row, error) {
se, err := do.sysSessionPool.Get()
defer func() {
do.sysSessionPool.Put(se)
}()
if err != nil {
return nil, errors.Annotate(err, "get session failed")
}
exec := se.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
r, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession},
sql, params...,
)
return r, err
}

func genRunawayQueriesStmt(records []*resourcegroup.RunawayRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.Time)
params = append(params, r.Match)
params = append(params, r.Action)
params = append(params, r.SQLText)
params = append(params, r.PlanDigest)
params = append(params, r.From)
}
return builder.String(), params
}

func genQuarantineQueriesStmt(records []*resourcegroup.QuarantineRecord) (string, []interface{}) {
var builder strings.Builder
params := make([]interface{}, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_quarantined_watch VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.StartTime)
params = append(params, r.EndTime)
params = append(params, r.Watch)
params = append(params, r.WatchText)
params = append(params, r.Source)
}
return builder.String(), params
}

func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client) error {
if pdClient == nil {
logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage")
// return nil as unistore doesn't support it
return nil
}

control, err := rmclient.NewResourceGroupController(ctx, do.ServerID(), pdClient, nil, rmclient.WithMaxWaitDuration(resourcegroup.MaxWaitDuration))
if err != nil {
return err
}
control.Start(ctx)
serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
serverAddr := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port)))
do.runawayManager = resourcegroup.NewRunawayManager(control, serverAddr)
do.resourceGroupsController = control
tikv.SetResourceControlInterceptor(control)
return nil
}

func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if pdClient == nil || do.etcdClient == nil {
Expand Down
Loading