Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: revise plan replayer process log #40126

Merged
merged 9 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ type planReplayerHandle struct {
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
return true
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
logutil.BgLogger().Warn("discard one plan replayer dump task",
zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest))
return false
}
}

Expand All @@ -209,9 +211,13 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-task] collect plan replayer task failed", zap.Error(err))
return err
}
if unhandled {
logutil.BgLogger().Debug("[plan-replayer-task] collect plan replayer task success",
zap.String("sql-digest", key.SQLDigest),
zap.String("plan-digest", key.PlanDigest))
tasks = append(tasks, key)
}
}
Expand Down Expand Up @@ -351,16 +357,36 @@ type planReplayerTaskDumpWorker struct {

func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
w.handleTask(task)
}
}

func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
sqlDigest := task.SQLDigest
planDigest := task.PlanDigest
check := true
occupy := true
handleTask := true
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
zap.Bool("check", check),
zap.Bool("occupy", occupy),
zap.Bool("handle", handleTask))
}()
if task.IsContinuesCapture {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
check = false
return
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
occupy = w.status.occupyRunningTaskKey(task)
if !occupy {
return
}
handleTask = w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}

// HandleTask handled task
Expand All @@ -373,7 +399,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
logutil.BgLogger().Warn("[plan-replayer-capture] check task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -386,7 +412,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -409,7 +435,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand All @@ -421,7 +447,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
var records []PlanReplayerStatusRecord
defer func() {
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
logutil.BgLogger().Error("[plan-replayer] dump file failed", zap.Error(err))
}
err = zw.Close()
if err != nil {
Expand Down
28 changes: 23 additions & 5 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,25 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode
if handle == nil {
return
}
captured := false
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx)
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Int("tasks", len(tasks)),
zap.Bool("captured", captured))
}()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
Expand All @@ -215,16 +223,26 @@ func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.Stm
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
captured := false
defer func() {
logutil.BgLogger().Debug("[plan-replayer-capture] check continues capture task",
zap.String("sql-digest", sqlDigest.String()),
zap.String("plan-digest", planDigest.String()),
zap.Bool("captured", captured))
}()

existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
if captured {
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
startTS uint64, isContinuesCapture bool) bool {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
Expand All @@ -239,7 +257,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
return domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// needLowerPriority checks whether it's needed to lower the execution priority
Expand Down