Skip to content

Commit

Permalink
executor: reject setting read ts to a future time (#25732)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Jun 25, 2021
1 parent 7ace592 commit 92ddceb
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 6 deletions.
15 changes: 15 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,12 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

// Setting snapshot to a time in the future will fail. (One day before the 2038 problem)
_, err = tk.Exec("set @@tidb_snapshot = '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
snapshotTime := time.Now()
Expand Down Expand Up @@ -2756,6 +2762,15 @@ func (s *testSuite2) TestLowResolutionTSORead(c *C) {
tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2"))
}

func (s *testSuite2) TestStaleReadFutureTime(c *C) {
tk := testkit.NewTestKit(c, s.store)
// Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem)
_, err := tk.Exec("set @@tx_read_ts = '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// TxnReadTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
}

func (s *testSuite) TestScanControlSelection(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
22 changes: 16 additions & 6 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -93,14 +94,14 @@ func (e *SetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
continue
}

if err := e.setSysVariable(name, v); err != nil {
if err := e.setSysVariable(ctx, name, v); err != nil {
return err
}
}
return nil
}

func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) error {
func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expression.VarAssignment) error {
sessionVars := e.ctx.GetSessionVars()
sysVar := variable.GetSysVar(name)
if sysVar == nil {
Expand Down Expand Up @@ -159,15 +160,24 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
}
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
if newSnapshotIsSet && name != variable.TiDBTxnReadTS {
err = gcutil.ValidateSnapshot(e.ctx, newSnapshotTS)
if newSnapshotIsSet {
if name == variable.TiDBTxnReadTS {
err = sessionctx.ValidateStaleReadTS(ctx, e.ctx, newSnapshotTS)
} else {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx, newSnapshotTS)
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
if err == nil {
err = gcutil.ValidateSnapshot(e.ctx, newSnapshotTS)
}
}
if err != nil {
fallbackOldSnapshotTS()
return err
}
}

err = e.loadSnapshotInfoSchemaIfNeeded(newSnapshotTS)
if err != nil {
fallbackOldSnapshotTS()
Expand Down
22 changes: 22 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,25 @@ func (s *testStaleTxnSuite) TestStaleSelect(c *C) {
time.Sleep(tolerance)
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 <nil>"))
}

func (s *testStaleTxnSuite) TestStaleReadFutureTime(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

// Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem)
_, err := tk.Exec("start transaction read only as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
// Transaction should not be started and read ts should not be set if check fails
c.Assert(tk.Se.GetSessionVars().InTxn(), IsFalse)
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))

_, err = tk.Exec("set transaction read only as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))

_, err = tk.Exec("select * from t as of timestamp '2038-01-18 03:14:07'")
c.Assert(err, ErrorMatches, "cannot set read timestamp to a future time")
}
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func RegisterMetrics() {
prometheus.MustRegister(StatementDeadlockDetectDuration)
prometheus.MustRegister(StatementPessimisticRetryCount)
prometheus.MustRegister(StatementLockKeysCount)
prometheus.MustRegister(ValidateReadTSFromPDCount)
prometheus.MustRegister(UpdateSelfVersionHistogram)
prometheus.MustRegister(UpdateStatsCounter)
prometheus.MustRegister(WatchOwnerCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ var (
Help: "Keys locking for a single statement",
Buckets: prometheus.ExponentialBuckets(1, 2, 21), // 1 ~ 1048576
})

ValidateReadTSFromPDCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "validate_read_ts_from_pd_count",
Help: "Counter of validating read ts by getting a timestamp from PD",
})
)

// Label constants.
Expand Down
3 changes: 3 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,9 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
} else if readTS > 0 {
p.StaleTxnStartTS = readTS
Expand Down
5 changes: 5 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package core

import (
"context"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -1525,6 +1526,10 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsExpr(ctx, node)
Expand Down
42 changes: 42 additions & 0 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package sessionctx
import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tipb/go-binlog"
"github.com/tikv/client-go/v2/oracle"
)

// InfoschemaMetaVersion is a workaround. Due to circular dependency,
Expand Down Expand Up @@ -149,3 +153,41 @@ const (
// LastExecuteDDL is the key for whether the session execute a ddl command last time.
LastExecuteDDL basicCtxType = 3
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error {
latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
// If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check
if err != nil || readTS > latestTS {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if readTS > currentVer.Ver {
return errors.Errorf("cannot set read timestamp to a future time")
}
}
return nil
}

// How far future from now ValidateStaleReadTS allows at most
const allowedTimeFromNow = 100 * time.Millisecond

// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
}

0 comments on commit 92ddceb

Please sign in to comment.