Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support reading historial data for infoschema v2 #51681

Merged
merged 9 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestGetExistedUserDBs(t *testing.T) {
},
nil, nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64))
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

Expand All @@ -396,7 +396,7 @@ func TestGetExistedUserDBs(t *testing.T) {
},
nil, nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64))
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 1, len(dbs))

Expand All @@ -412,7 +412,7 @@ func TestGetExistedUserDBs(t *testing.T) {
},
nil, nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64))
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 2, len(dbs))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package ddl_test

import (
"context"
"math"
"testing"

"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestPlacementPolicyInUse(t *testing.T) {
1,
)
require.NoError(t, err)
is := builder.Build()
is := builder.Build(math.MaxUint64)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
for _, policy := range []*model.PolicyInfo{p1, p2, p4, p5} {
Expand Down
18 changes: 9 additions & 9 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil {
// try to insert here as well to correct the schemaTs if previous is wrong
// the insert method check if schemaTs is zero
do.infoCache.Insert(is, uint64(schemaTs))
do.infoCache.Insert(is, schemaTs)

enableV2 := variable.SchemaCacheSize.Load() > 0
isV2 := infoschema.IsV2(is)
Expand All @@ -271,10 +271,10 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < LoadSchemaDiffVersionGapThreshold {
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
is, relatedChanges, diffTypes, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion, schemaTs)
if err == nil {
infoschema_metrics.LoadSchemaDurationLoadDiff.Observe(time.Since(startTime).Seconds())
do.infoCache.Insert(is, uint64(schemaTs))
do.infoCache.Insert(is, schemaTs)
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
Expand Down Expand Up @@ -315,13 +315,13 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)))

is := newISBuilder.Build()
do.infoCache.Insert(is, uint64(schemaTs))
is := newISBuilder.Build(schemaTs)
do.infoCache.Insert(is, schemaTs)
return is, false, currentSchemaVersion, nil, nil
}

// Returns the timestamp of a schema version, which is the commit timestamp of the schema diff
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (int64, error) {
func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, version int64, startTS uint64) (uint64, error) {
tikvStore, ok := do.Store().(helper.Storage)
if ok {
newHelper := helper.NewHelper(tikvStore)
Expand All @@ -332,7 +332,7 @@ func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, ver
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(mvccResp.Info.Writes[0].CommitTs), nil
return mvccResp.Info.Writes[0].CommitTs, nil
}
return 0, errors.Errorf("cannot get store from domain")
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, don
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64, schemaTS uint64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, []string, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
Expand Down Expand Up @@ -480,7 +480,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
}

is := builder.Build()
is := builder.Build(schemaTS)
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
Expand Down
19 changes: 11 additions & 8 deletions pkg/executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func TestShowStatusSnapshot(t *testing.T) {
tk.MustExec("drop database if exists test;")
tk.MustExec("create database test;")
tk.MustExec("use test;")
tk.MustExec("create table t (a int);")

// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
Expand All @@ -293,13 +292,17 @@ func TestShowStatusSnapshot(t *testing.T) {
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

snapshotTime := time.Now()

tk.MustExec("drop table t;")
tk.MustQuery("show table status;").Check(testkit.Rows())
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
result := tk.MustQuery("show table status;")
require.Equal(t, "t", result.Rows()[0][0])
for _, cacheSize := range []int{1024, 0} {
tk.MustExec("set @@global.tidb_schema_cache_size = ?", cacheSize)
tk.MustExec("create table t (a int);")
snapshotTime := time.Now()
tk.MustExec("drop table t;")
tk.MustQuery("show table status;").Check(testkit.Rows())
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
result := tk.MustQuery("show table status;")
require.Equal(t, "t", result.Rows()[0][0])
tk.MustExec("set @@tidb_snapshot = null;")
}
}

func TestShowStatsExtended(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"os"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newSlowQueryRetriever() (*slowQueryRetriever, error) {
if err != nil {
return nil, err
}
is := newISBuilder.Build()
is := newISBuilder.Build(math.MaxUint64)
tbl, err := is.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableSlowQuery))
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions pkg/executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,14 @@ func TestSetTransactionInfoSchema(t *testing.T) {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

for _, cacheSize := range []int{1024, 0} {
tk.MustExec("set @@global.tidb_schema_cache_size = ?", cacheSize)
testSetTransactionInfoSchema(t, tk)
}
}

func testSetTransactionInfoSchema(t *testing.T, tk *testkit.TestKit) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/stmtsummary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"math"
"os"
"testing"
"time"
Expand All @@ -33,7 +34,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummary(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummary))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummaryEvicted(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryEvicted))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestStmtSummaryRetriverV2_TableStatementsSummaryHistory(t *testing.T) {
data := infoschema.NewData()
infoSchemaBuilder, err := infoschema.NewBuilder(nil, nil, data).InitWithDBInfos(nil, nil, nil, 0)
require.NoError(t, err)
infoSchema := infoSchemaBuilder.Build()
infoSchema := infoSchemaBuilder.Build(math.MaxUint64)
table, err := infoSchema.TableByName(util.InformationSchemaName, model.NewCIStr(infoschema.TableStatementsSummaryHistory))
require.NoError(t, err)
columns := table.Meta().Columns
Expand Down
49 changes: 10 additions & 39 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"cmp"
"context"
"fmt"
"math"
"slices"
"strings"

Expand Down Expand Up @@ -99,7 +98,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
case model.ActionFlashbackCluster:
return []int64{-1}, nil
default:
return b.applyDefaultAction(m, diff)
return applyDefaultAction(b, m, diff)
}
}

Expand Down Expand Up @@ -311,7 +310,7 @@ func (b *Builder) applyAffectedOpts(m *meta.Meta, tblIDs []int64, diff *model.Sc
return tblIDs, nil
}

func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
func applyDefaultAction(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := applyTableUpdate(b, m, diff)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -367,7 +366,7 @@ func (b *Builder) updateBundleForTableUpdate(diff *model.SchemaDiff, newTableID,
}
}

func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model.DBInfo, diff *model.SchemaDiff) ([]int64, autoid.Allocators, error) {
func dropTableForUpdate(b *Builder, newTableID, oldTableID int64, dbInfo *model.DBInfo, diff *model.SchemaDiff) ([]int64, autoid.Allocators, error) {
tblIDs := make([]int64, 0, 2)
var newAllocs autoid.Allocators
// We try to reuse the old allocator, so the cached auto ID can be reused.
Expand Down Expand Up @@ -398,9 +397,9 @@ func (b *Builder) dropTableForUpdate(newTableID, oldTableID int64, dbInfo *model
)
}
oldDBInfo := b.getSchemaAndCopyIfNecessary(oldRoDBInfo.Name.L)
tmpIDs = b.applyDropTable(diff, oldDBInfo, oldTableID, tmpIDs)
tmpIDs = applyDropTable(b, diff, oldDBInfo, oldTableID, tmpIDs)
} else {
tmpIDs = b.applyDropTable(diff, dbInfo, oldTableID, tmpIDs)
tmpIDs = applyDropTable(b, diff, dbInfo, oldTableID, tmpIDs)
}

if oldTableID != newTableID {
Expand All @@ -423,15 +422,15 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
b.updateBundleForTableUpdate(diff, newTableID, oldTableID)
b.copySortedTables(oldTableID, newTableID)

tblIDs, allocs, err := b.dropTableForUpdate(newTableID, oldTableID, dbInfo, diff)
tblIDs, allocs, err := dropTableForUpdate(b, newTableID, oldTableID, dbInfo, diff)
if err != nil {
return nil, err
}

if tableIDIsValid(newTableID) {
// All types except DropTableOrView.
var err error
tblIDs, err = b.applyCreateTable(m, dbInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
tblIDs, err = applyCreateTable(b, m, dbInfo, newTableID, allocs, diff.Type, tblIDs, diff.Version)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -547,31 +546,6 @@ func (b *Builder) applyDropSchema(diff *model.SchemaDiff) []int64 {
return tableIDs
}

func (b *Builder) applyDropTableV2(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
// Remove the table in temporaryTables
if b.infoSchemaMisc.temporaryTableIDs != nil {
delete(b.infoSchemaMisc.temporaryTableIDs, tableID)
}

table, ok := b.infoschemaV2.TableByID(tableID)

if !ok {
return nil
}

b.infoData.delete(tableItem{
dbName: dbInfo.Name.L,
dbID: dbInfo.ID,
tableName: table.Meta().Name.L,
tableID: table.Meta().ID,
schemaVersion: diff.Version,
})

// The old DBInfo still holds a reference to old table info, we need to remove it.
b.deleteReferredForeignKeys(dbInfo, tableID)
return affected
}

func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
if di, ok := b.infoSchema.SchemaByID(diff.SchemaID); ok {
return nil, ErrDatabaseExists.GenWithStackByArgs(
Expand Down Expand Up @@ -655,7 +629,7 @@ func (b *Builder) buildAllocsForCreateTable(tp model.ActionType, dbInfo *model.D
return autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo)
}

func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
func applyCreateTable(b *Builder, m *meta.Meta, dbInfo *model.DBInfo, tableID int64, allocs autoid.Allocators, tp model.ActionType, affected []int64, schemaVersion int64) ([]int64, error) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
tblInfo, err := m.GetTable(dbInfo.ID, tableID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -751,9 +725,6 @@ func ConvertOldVersionUTF8ToUTF8MB4IfNeed(tbInfo *model.TableInfo) {
}

func (b *Builder) applyDropTable(diff *model.SchemaDiff, dbInfo *model.DBInfo, tableID int64, affected []int64) []int64 {
if b.enableV2 {
return b.applyDropTableV2(diff, dbInfo, tableID, affected)
}
bucketIdx := tableBucketIdx(tableID)
sortedTbls := b.infoSchema.sortedTablesBuckets[bucketIdx]
idx := sortedTbls.searchTable(tableID)
Expand Down Expand Up @@ -792,9 +763,9 @@ func (b *Builder) deleteReferredForeignKeys(dbInfo *model.DBInfo, tableID int64)
}

// Build builds and returns the built infoschema.
func (b *Builder) Build() InfoSchema {
func (b *Builder) Build(schemaTS uint64) InfoSchema {
if b.enableV2 {
b.infoschemaV2.ts = math.MaxUint64 // TODO: should be the correct TS
b.infoschemaV2.ts = schemaTS
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
b.infoschemaV2.schemaVersion = b.infoSchema.SchemaMetaVersion()
updateInfoSchemaBundles(b)
return &b.infoschemaV2
Expand Down
Loading
Loading