Skip to content

Commit

Permalink
infoschema: rewrite SchemaSimpleTableInfos for infoschema v2 to avoid…
Browse files Browse the repository at this point in the history
… network (#55088)

ref #50959
  • Loading branch information
tiancaiamao authored Aug 6, 2024
1 parent 1985662 commit 1fb4019
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 24 deletions.
47 changes: 24 additions & 23 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,33 +795,34 @@ func (is *infoschemaV2) SchemaSimpleTableInfos(ctx context.Context, schema model
return nil, nil // something wrong?
}

retry:
dbInfo, ok := is.SchemaByName(schema)
if !ok {
// Ascend is much more difficult than Descend.
// So the data is taken out first and then dedup in Descend order.
var tableItems []tableItem
is.byName.Ascend(tableItem{dbName: schema.L}, func(item tableItem) bool {
if item.dbName != schema.L {
return false
}
if is.infoSchema.schemaMetaVersion >= item.schemaVersion {
tableItems = append(tableItems, item)
}
return true
})
if len(tableItems) == 0 {
return nil, nil
}
snapshot := is.r.Store().GetSnapshot(kv.NewVersion(is.ts))
// Using the KV timeout read feature to address the issue of potential DDL lease expiration when
// the meta region leader is slow.
snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms.
m := meta.NewSnapshotMeta(snapshot)
tblInfos, err := m.ListSimpleTables(dbInfo.ID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return nil, nil
}
// Flashback statement could cause such kind of error.
// In theory that error should be handled in the lower layer, like client-go.
// But it's not done, so we retry here.
if strings.Contains(err.Error(), "in flashback progress") {
select {
case <-time.After(200 * time.Millisecond):
case <-ctx.Done():
return nil, ctx.Err()
tblInfos := make([]*model.TableNameInfo, 0, len(tableItems))
var curr *tableItem
for i := len(tableItems) - 1; i >= 0; i-- {
item := &tableItems[i]
if curr == nil || curr.tableName != tableItems[i].tableName {
curr = item
if !item.tomb {
tblInfos = append(tblInfos, &model.TableNameInfo{
ID: item.tableID,
Name: model.NewCIStr(item.tableName),
})
}
goto retry
}
return nil, errors.Trace(err)
}
return tblInfos, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand Down
58 changes: 58 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"slices"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -428,3 +429,60 @@ func TestIssue54926(t *testing.T) {
require.Equal(t, time2TS, tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion())
}

func TestSchemaSimpleTableInfos(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)
tk.MustExec("set @@global.tidb_schema_cache_size = '512MB'")

tk.MustExec("create database aaa")
tk.MustExec("create database zzz")
tk.MustExec("drop database zzz")

tk.MustExec("create database simple")
tk.MustExec("use simple")
tk.MustExec("create table t1 (id int)")
tk.MustExec("create table t2 (id int)")

time1 := time.Now()
time.Sleep(50 * time.Millisecond)

tk.MustExec("rename table simple.t2 to aaa.t2")
tk.MustExec("drop database aaa")

is := tk.Session().GetInfoSchema()
// Cover special schema
tblInfos, err := is.SchemaSimpleTableInfos(context.Background(), model.NewCIStr("INFORMATION_SCHEMA"))
require.NoError(t, err)
res := make([]string, 0, len(tblInfos))
for _, tbl := range tblInfos {
res = append(res, tbl.Name.L)
}
sort.Strings(res)
tk.MustQuery("select lower(table_name) from information_schema.tables where table_schema = 'information_schema'").
Sort().Check(testkit.Rows(res...))

// Cover normal schema
tblInfos, err = is.SchemaSimpleTableInfos(context.Background(), model.NewCIStr("simple"))
require.NoError(t, err)
require.Len(t, tblInfos, 1)
require.Equal(t, tblInfos[0].Name.L, "t1")

// Cover snapshot infoschema
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000")))
is = tk.Session().GetInfoSchema()
tblInfos, err = is.SchemaSimpleTableInfos(context.Background(), model.NewCIStr("simple"))
require.NoError(t, err)
require.Len(t, tblInfos, 2)
require.Equal(t, tblInfos[0].Name.L, "t2")
require.Equal(t, tblInfos[1].Name.L, "t1")
}

0 comments on commit 1fb4019

Please sign in to comment.