Skip to content

Commit

Permalink
domain: support plan_replayer_task system table (#39019)
Browse files Browse the repository at this point in the history
close #38779
  • Loading branch information
Yisaer authored Nov 11, 2022
1 parent 3bbffdf commit ddb4355
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 34 deletions.
3 changes: 3 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//telemetry",
"//types",
"//util",
"//util/chunk",
"//util/dbterror",
"//util/domainutil",
"//util/engine",
Expand Down Expand Up @@ -86,6 +87,7 @@ go_test(
"domain_utils_test.go",
"domainctx_test.go",
"main_test.go",
"plan_replayer_handle_test.go",
"plan_replayer_test.go",
"schema_checker_test.go",
"schema_validator_test.go",
Expand All @@ -109,6 +111,7 @@ go_test(
"//session",
"//sessionctx/variable",
"//store/mockstore",
"//testkit",
"//testkit/testsetup",
"//util",
"//util/mock",
Expand Down
40 changes: 37 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,12 +1533,46 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
sctx: ctx,
}
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.sctxMu.sctx = ctx
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
}

var planReplayerHandleLease = 10 * time.Second

// DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test
func DisablePlanReplayerBackgroundJob4Test() {
planReplayerHandleLease = 0
}

// StartPlanReplayerHandle start plan replayer handle job
func (do *Domain) StartPlanReplayerHandle() {
if planReplayerHandleLease < 1 {
return
}
do.wg.Add(1)
go func() {
tikcer := time.NewTicker(planReplayerHandleLease)
defer func() {
tikcer.Stop()
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case <-tikcer.C:
err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background())
if err != nil {
logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err))
}
}
}
}()
}

// GetPlanReplayerHandle returns plan replayer handle
func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle {
return do.planReplayerHandle
Expand Down
144 changes: 133 additions & 11 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand Down Expand Up @@ -115,16 +121,23 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
}

type planReplayerHandle struct {
sync.Mutex
sctx sessionctx.Context
sctxMu struct {
sync.Mutex
sctx sessionctx.Context
}

taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
Expand Down Expand Up @@ -154,9 +167,9 @@ func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, recor
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
Expand All @@ -167,9 +180,9 @@ func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx con
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.Lock()
defer h.Unlock()
exec := h.sctx.(sqlexec.SQLExecutor)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
Expand All @@ -179,10 +192,119 @@ func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx c
}
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := h.checkUnHandledReplayerTask(ctx1, key)
if err != nil {
return err
}
if unhandled {
tasks = append(tasks, key)
}
}
h.setupTasks(tasks)
return nil
}

// GetTasks get all tasks
func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
for taskKey := range h.taskMu.tasks {
tasks = append(tasks, taskKey)
}
return tasks
}

func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
}
h.taskMu.Lock()
defer h.taskMu.Unlock()
h.taskMu.tasks = r
}

func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
return nil, err
}
if rs == nil {
return nil, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return nil, errors.Trace(err)
}
allKeys := make([]PlanReplayerTaskKey, 0, len(rows))
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
sqlDigest: sqlDigest,
planDigest: planDigest,
})
}
return allKeys, nil
}

func (h *planReplayerHandle) checkUnHandledReplayerTask(ctx context.Context, task PlanReplayerTaskKey) (bool, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
if err != nil {
return false, err
}
if rs == nil {
return true, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return false, errors.Trace(err)
}
if len(rows) > 0 {
return false, nil
}
return true, nil
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
OriginSQL string
Token string
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
sqlDigest string
planDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}
64 changes: 64 additions & 0 deletions domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package domain_test

import (
"context"
"testing"

"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestPlanReplayerHandleCollectTask(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
prHandle := dom.GetPlanReplayerHandle()

// assert 1 task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
err := prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

// assert no task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 0)

// assert 1 unhandled task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');")
tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values ('123','123','123','123')")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

// assert 2 unhandled task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('123','123');")
tk.MustExec("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('345','345');")
tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, fail_reason, instance) values ('123','123','123','123')")
err = prHandle.CollectPlanReplayerTask(context.Background())
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 2)
}
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 38
result := 39
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
18 changes: 3 additions & 15 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func generatePlanReplayerFileName() (string, error) {
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
task := &PlanReplayerDumpTask{
task := &domain.PlanReplayerDumpTask{
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
Expand All @@ -242,18 +242,6 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
return nil
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}

// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
Expand Down Expand Up @@ -284,7 +272,7 @@ type PlanReplayerDumpTask struct {
|-....
*/
func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
task *PlanReplayerDumpTask) (err error) {
task *domain.PlanReplayerDumpTask) (err error) {
zf := task.Zf
fileName := task.FileName
sessionVars := task.SessionVars
Expand Down Expand Up @@ -373,7 +361,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func generateRecords(task *PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
func generateRecords(task *domain.PlanReplayerDumpTask) []domain.PlanReplayerStatusRecord {
records := make([]domain.PlanReplayerStatusRecord, 0)
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
Expand Down
Loading

0 comments on commit ddb4355

Please sign in to comment.