Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: reject setting read ts to a future time #25732

Merged
merged 16 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,12 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

// Setting snapshot to a time in the future will fail. (One day before the 2038 problem)
_, err = tk.Exec("set @@tidb_snapshot = '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
snapshotTime := time.Now()
Expand Down Expand Up @@ -2756,6 +2762,15 @@ func (s *testSuite2) TestLowResolutionTSORead(c *C) {
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2"))
}

func (s *testSuite2) TestStaleReadFutureTime(c *C) {
tk := testkit.NewTestKit(c, s.store)
// Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem)
_, err := tk.Exec("set @@tx_read_ts = '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// TxnReadTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
}

func (s *testSuite) TestScanControlSelection(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
22 changes: 16 additions & 6 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -93,14 +94,14 @@ func (e *SetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
continue
}

if err := e.setSysVariable(name, v); err != nil {
if err := e.setSysVariable(ctx, name, v); err != nil {
return err
}
}
return nil
}

func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) error {
func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expression.VarAssignment) error {
sessionVars := e.ctx.GetSessionVars()
sysVar := variable.GetSysVar(name)
if sysVar == nil {
Expand Down Expand Up @@ -159,15 +160,24 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
}
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
if newSnapshotIsSet && name != variable.TiDBTxnReadTS {
err = gcutil.ValidateSnapshot(e.ctx, newSnapshotTS)
if newSnapshotIsSet {
if name == variable.TiDBTxnReadTS {
err = sessionctx.ValidateStaleReadTS(ctx, e.ctx, newSnapshotTS)
} else {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx, newSnapshotTS)
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
if err == nil {
err = gcutil.ValidateSnapshot(e.ctx, newSnapshotTS)
}
}
if err != nil {
fallbackOldSnapshotTS()
return err
}
}

err = e.loadSnapshotInfoSchemaIfNeeded(newSnapshotTS)
if err != nil {
fallbackOldSnapshotTS()
Expand Down
22 changes: 22 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,25 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) {
time.Sleep(tolerance)
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 <nil>"))
}

func (s *testStaleTxnSuite) TestStaleReadFutureTime(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

// Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem)
_, err := tk.Exec("start transaction read only as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// Transaction should not be started and read ts should not be set if check fails
c.Assert(tk.Se.GetSessionVars().InTxn(), IsFalse)
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))

_, err = tk.Exec("set transaction read only as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))

_, err = tk.Exec("select * from t as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
}
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func RegisterMetrics() {
prometheus.MustRegister(StatementDeadlockDetectDuration)
prometheus.MustRegister(StatementPessimisticRetryCount)
prometheus.MustRegister(StatementLockKeysCount)
prometheus.MustRegister(ValidateReadTSFromPDCount)
prometheus.MustRegister(UpdateSelfVersionHistogram)
prometheus.MustRegister(UpdateStatsCounter)
prometheus.MustRegister(WatchOwnerCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ var (
Help: "Keys locking for a single statement",
Buckets: prometheus.ExponentialBuckets(1, 2, 21), // 1 ~ 1048576
})

ValidateReadTSFromPDCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "validate_read_ts_from_pd_count",
Help: "Counter of validating read ts by getting a timestamp from PD",
})
)

// Label constants.
Expand Down
3 changes: 3 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,9 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
} else if readTS > 0 {
p.StaleTxnStartTS = readTS
Expand Down
5 changes: 5 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package core

import (
"context"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -1525,6 +1526,10 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsExpr(ctx, node)
Expand Down
40 changes: 40 additions & 0 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tipb/go-binlog"
"github.com/tikv/client-go/v2/oracle"
)

// InfoschemaMetaVersion is a workaround. Due to circular dependency,
Expand Down Expand Up @@ -149,3 +152,40 @@ const (
// LastExecuteDDL is the key for whether the session execute a ddl command last time.
LastExecuteDDL basicCtxType = 3
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error {
txnScope := sctx.GetSessionVars().CheckAndGetTxnScope()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use oracle.GlobalTxnScope directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. This looks safer.

latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: txnScope})
// If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check
if err != nil || readTS > latestTS {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(txnScope)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if readTS > currentVer.Ver {
return errors.Errorf("cannot set read timestamp to a future time")
}
}
return nil
}

// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
txnScope := sctx.GetSessionVars().CheckAndGetTxnScope()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the check for stale read need to use global scope? This can be not strict so I guess a close PD would suffice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetStaleTimestamp is directly visit the cache in memory, so I think it's ok to use oracle.GlobalTxnScope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, if you insist on using txnScope here, there should be config.GetTxnScopeFromConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I change to use global scope. I think there's little difference.

currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, txnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(txnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if readTS > currentTS {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
}