Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: recalculate as-of ts of staleread when plan is cached #43204

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,3 +1370,29 @@ func TestIssue35686(t *testing.T) {
// This query should not panic
tk.MustQuery("select * from information_schema.ddl_jobs as of timestamp now()")
}

func TestStalePrepare(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

stmtID, _, _, err := tk.Session().PrepareStmt("select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc")
require.Nil(t, err)
tk.MustExec("prepare stmt from \"select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc\"")

var expected [][]interface{}
for i := 0; i < 20; i++ {
tk.MustExec("insert into t values(?)", i)
time.Sleep(2 * time.Millisecond) // sleep 2ms to ensure staleread_ts > commit_ts.

expected = append(expected, testkit.Rows(fmt.Sprintf("%d", i))...)
rs, err := tk.Session().ExecutePreparedStmt(context.Background(), stmtID, nil)
require.Nil(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(expected)
rs.Close()
tk.MustQuery("execute stmt").Check(expected)
}
}
21 changes: 17 additions & 4 deletions sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error {
}

// If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...'
stmtAsOfTS, err := parseAndValidateAsOf(p.ctx, p.sctx, tn.AsOf)
evaluateTS := func(sctx sessionctx.Context) (uint64, error) {
return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf)
}
stmtAsOfTS, err := evaluateTS(p.sctx)
if err != nil {
return err
}
Expand All @@ -179,7 +182,7 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error {
}
return nil
}
return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS)
return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS, evaluateTS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between OnExecutePreparedStmt and OnSelectTable, why do we need to calculate the TS in two places? Is it possible to merge them into one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we set the ts evaluator which needs to be calculated to staleness processor when preparing, and it'll be returned and set to plan cache.

When invoking OnExecutePreparedStmt, it only uses the ts evaluator from plan cache, so it's not necessary to set a evaluator that requires calculation to get a timestamp.

}

func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator StalenessTSEvaluator) (err error) {
Expand All @@ -201,7 +204,10 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness
return err
}
}
return p.evaluateFromStmtTSOrSysVariable(stmtTS)
// When executing a prepared stmt, the stmtTS is calculated once and reused to avoid eval overhead,
// note it only takes PlanCacheStmt.SnapshotTSEvaluator without overwriting it.
// the evaluator will be re-calculated in next execution.
return p.evaluateFromStmtTSOrSysVariable(stmtTS, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some comments about the above logic.

}

func (p *staleReadProcessor) evaluateFromTxn() error {
Expand All @@ -223,7 +229,7 @@ func (p *staleReadProcessor) evaluateFromTxn() error {
return p.setAsNonStaleRead()
}

func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) error {
func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64, evaluator StalenessTSEvaluator) error {
// If `txnReadTS` is not 0, it means we meet following situation:
// set transaction read only as of timestamp ...
// select from table or execute prepared statement
Expand All @@ -235,6 +241,13 @@ func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) erro

if stmtTS > 0 {
p.stmtTS = stmtTS
if evaluator != nil {
is, err := GetSessionSnapshotInfoSchema(p.sctx, stmtTS)
if err != nil {
return err
}
return p.setEvaluatedValues(stmtTS, is, evaluator)
}
return p.setEvaluatedTS(stmtTS)
}

Expand Down