Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support reading historial data for infoschema v2 #51681

Merged
merged 9 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
797 changes: 581 additions & 216 deletions MODULE.bazel.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < LoadSchemaDiffVersionGapThreshold {
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion, schemaTs)
if err == nil {
infoschema_metrics.LoadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, uint64(schemaTs))
Expand Down Expand Up @@ -315,13 +315,13 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)))

is := newISBuilder.Build()
is := newISBuilder.Build(schemaTs)
do.infoCache.Insert(is, uint64(schemaTs))
return is, false, currentSchemaVersion, nil, nil
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) {
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (uint64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
newHelper := helper.NewHelper(tikvStore)
Expand All @@ -332,7 +332,7 @@ func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, ver
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(mvccResp.Info.Writes[0].CommitTs), nil
return mvccResp.Info.Writes[0].CommitTs, nil
}
return 0, errors.Errorf("cannot get store from domain")
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, don
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64, schemaTS uint64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
Expand Down Expand Up @@ -480,7 +480,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
}

is := builder.Build()
is := builder.Build(schemaTS)
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
Expand Down
19 changes: 11 additions & 8 deletions pkg/executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func TestShowStatusSnapshot(t *testing.T) {
tk.MustExec("drop database if exists test;")
tk.MustExec("create database test;")
tk.MustExec("use test;")
tk.MustExec("create table t (a int);")

// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
Expand All @@ -290,13 +289,17 @@ func TestShowStatusSnapshot(t *testing.T) {
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

snapshotTime := time.Now()

tk.MustExec("drop table t;")
tk.MustQuery("show table status;").Check(testkit.Rows())
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
result := tk.MustQuery("show table status;")
require.Equal(t, "t", result.Rows()[0][0])
for _, cacheSize := range []int{1024, 0} {
tk.MustExec("set @@global.tidb_schema_cache_size = ?", cacheSize)
tk.MustExec("create table t (a int);")
snapshotTime := time.Now()
tk.MustExec("drop table t;")
tk.MustQuery("show table status;").Check(testkit.Rows())
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
result := tk.MustQuery("show table status;")
require.Equal(t, "t", result.Rows()[0][0])
tk.MustExec("set @@tidb_snapshot = null;")
}
}

func TestShowStatsExtended(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"os"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newSlowQueryRetriever() (*slowQueryRetriever, error) {
if err != nil {
return nil, err
}
is := newISBuilder.Build()
is := newISBuilder.Build(math.MaxUint64)
tbl, err := is.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableSlowQuery))
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions pkg/executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,14 @@ func TestSetTransactionInfoSchema(t *testing.T) {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

for _, cacheSize := range []int{1024, 0} {
tk.MustExec("set @@global.tidb_schema_cache_size = ?", cacheSize)
testSetTransactionInfoSchema(t, tk)
}
}

func testSetTransactionInfoSchema(t *testing.T, tk *testkit.TestKit) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/stmtsummary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"math"
"os"
"testing"
"time"
Expand All @@ -33,7 +34,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummary(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummary))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummaryEvicted(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryEvicted))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummaryHistory(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryHistory))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down
41 changes: 6 additions & 35 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"cmp"
"context"
"fmt"
"math"
"slices"
"strings"

Expand Down Expand Up @@ -99,7 +98,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
case model.ActionFlashbackCluster:
return []int64{-1}, nil
default:
return b.applyDefaultAction(m, diff)
return applyDefaultAction(b, m, diff)
}
}

Expand Down Expand Up @@ -311,7 +310,7 @@ func (b *Builder) applyAffectedOpts(m *meta.Meta, tblIDs []int64, diff *model.Sc
return tblIDs, nil
}

func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func applyDefaultAction(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := applyTableUpdate(b, m, diff)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -431,7 +430,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
if tableIDIsValid(newTableID) {
// All types except DropTableOrView.
var err error
tblIDs, err = b.applyCreateTable(m, dbInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
tblIDs, err = applyCreateTable(b, m, dbInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -547,31 +546,6 @@ func (b *Builder) applyDropSchema(diff *model.SchemaDiff) []int64 {
return tableIDs
}

func (b *Builder) applyDropTableV2(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
// Remove the table in temporaryTables
if b.infoSchemaMisc.temporaryTableIDs != nil {
delete(b.infoSchemaMisc.temporaryTableIDs, tableID)
}

table, ok := b.infoschemaV2.TableByID(tableID)

if !ok {
return nil
}

b.infoData.delete(tableItem{
dbName: dbInfo.Name.L,
dbID: dbInfo.ID,
tableName: table.Meta().Name.L,
tableID: table.Meta().ID,
schemaVersion: diff.Version,
})

// The old DBInfo still holds a reference to old table info, we need to remove it.
b.deleteReferredForeignKeys(dbInfo, tableID)
return affected
}

func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if di, ok := b.infoSchema.SchemaByID(diff.SchemaID); ok {
return nil, ErrDatabaseExists.GenWithStackByArgs(
Expand Down Expand Up @@ -655,7 +629,7 @@ func (b *Builder) buildAllocsForCreateTable(tp model.ActionType, dbInfo *model.D
return autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo)
}

func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
func applyCreateTable(b *Builder, m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
tblInfo, err := m.GetTable(dbInfo.ID, tableID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -751,9 +725,6 @@ func ConvertOldVersionUTF8ToUTF8MB4IfNeed(tbInfo *model.TableInfo) {
}

func (b *Builder) applyDropTable(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
if b.enableV2 {
return b.applyDropTableV2(diff, dbInfo, tableID, affected)
}
bucketIdx := tableBucketIdx(tableID)
sortedTbls := b.infoSchema.sortedTablesBuckets[bucketIdx]
idx := sortedTbls.searchTable(tableID)
Expand Down Expand Up @@ -792,10 +763,10 @@ func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64)
}

// Build builds and returns the built infoschema.
func (b *Builder) Build() InfoSchema {
func (b *Builder) Build(schemaTS uint64) InfoSchema {
b.updateInfoSchemaBundles(b.infoSchema)
if b.enableV2 {
b.infoschemaV2.ts = math.MaxUint64 // TODO: should be the correct TS
b.infoschemaV2.ts = schemaTS
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
b.infoschemaV2.schemaVersion = b.infoSchema.SchemaMetaVersion()
return &b.infoschemaV2
}
Expand Down
46 changes: 43 additions & 3 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (isd *Data) addDB(schemaVersion int64, dbInfo *model.DBInfo) {
isd.schemaMap.Set(schemaItem{schemaVersion: schemaVersion, dbInfo: dbInfo})
}

func (isd *Data) delete(item tableItem) {
func (isd *Data) remove(item tableItem) {
isd.tableCache.Remove(tableCacheKey{item.tableID, item.schemaVersion})
}

Expand Down Expand Up @@ -212,7 +212,14 @@ func compareByName(a, b tableItem) bool {
return false
}

return a.tableID < b.tableID
if a.tableID < b.tableID {
return true
}
if a.tableID > b.tableID {
return false
}

return a.schemaVersion < b.schemaVersion
}

func compareSchemaItem(a, b schemaItem) bool {
Expand Down Expand Up @@ -616,6 +623,13 @@ func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff)
return b.applyDropTableOrPartition(m, diff)
}

func applyDropTable(b *Builder, diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
if b.enableV2 {
return b.applyDropTableV2(diff, dbInfo, tableID, affected)
}
return b.applyDropTable(diff, dbInfo, tableID, affected)
}

func applyRecoverTable(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if b.enableV2 {
return b.applyRecoverTableV2(m, diff)
Expand Down Expand Up @@ -664,10 +678,11 @@ func (b *Builder) applyTableUpdateV2(m *meta.Meta, diff *model.SchemaDiff) ([]in
if tableIDIsValid(newTableID) {
// All types except DropTableOrView.
var err error
tblIDs, err = b.applyCreateTable(m, oldDBInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
tblIDs, err = applyCreateTable(b, m, oldDBInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
if err != nil {
return nil, errors.Trace(err)
}

}
return tblIDs, nil
}
Expand All @@ -692,6 +707,31 @@ func (b *Builder) applyDropSchemaV2(diff *model.SchemaDiff) []int64 {
return tableIDs
}

func (b *Builder) applyDropTableV2(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
// Remove the table in temporaryTables
if b.infoSchemaMisc.temporaryTableIDs != nil {
delete(b.infoSchemaMisc.temporaryTableIDs, tableID)
}

table, ok := b.infoschemaV2.TableByID(tableID)

if !ok {
return nil
}

b.infoData.remove(tableItem{
dbName: dbInfo.Name.L,
dbID: dbInfo.ID,
tableName: table.Meta().Name.L,
tableID: table.Meta().ID,
schemaVersion: diff.Version,
})

// The old DBInfo still holds a reference to old table info, we need to remove it.
b.deleteReferredForeignKeys(dbInfo, tableID)
return affected
}

func (b *Builder) applyRecoverSchemaV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
panic("TODO")
}
Expand Down