Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: remove the inspection_schema #15296

Merged
merged 6 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case util.MetricSchemaName.L:
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
Expand All @@ -1330,13 +1331,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterConfig):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterConfigRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
},
}
case strings.ToLower(infoschema.TableClusterLoad):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_LoadInfo,
Expand All @@ -1345,6 +1348,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterHardware):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_HardwareInfo,
Expand All @@ -1353,6 +1357,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterSystemInfo):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_SystemInfo,
Expand All @@ -1361,13 +1366,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterLog):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterLogRetriever{
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
},
}
case strings.ToLower(infoschema.TableInspectionResult):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionResultRetriever{
extractor: v.Extractor.(*plannercore.InspectionResultTableExtractor),
timeRange: v.QueryTimeRange,
Expand All @@ -1376,6 +1383,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableInspectionSummary):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.InspectionSummaryTableExtractor),
Expand All @@ -1385,13 +1393,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableInspectionRules):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionRuleRetriever{
extractor: v.Extractor.(*plannercore.InspectionRuleTableExtractor),
},
}
case strings.ToLower(infoschema.TableMetricSummary):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricsSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
Expand All @@ -1401,6 +1411,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableMetricSummaryByLabel):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricsSummaryByLabelRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
Expand All @@ -1427,6 +1438,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableConstraints):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &memtableRetriever{
table: v.Table,
columns: v.Columns,
Expand All @@ -1435,6 +1447,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (e *memtableRetriever) setDataFromTables(ctx sessionctx.Context, schemas []
var tableType string
switch schema.Name.L {
case util.InformationSchemaName.L, util.PerformanceSchemaName.L,
util.MetricSchemaName.L, util.InspectionSchemaName.L:
util.MetricSchemaName.L:
tableType = "SYSTEM VIEW"
default:
tableType = "BASE TABLE"
Expand Down
67 changes: 67 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"google.golang.org/grpc"
)
Expand All @@ -50,6 +54,8 @@ var _ = Suite(&testInfoschemaTableSuite{})
// if your test not change the TableStatsCacheExpiry variable, please use testInfoschemaTableSuite for test.
var _ = SerialSuites(&testInfoschemaTableSerialSuite{})

var _ = SerialSuites(&inspectionSuite{})

type testInfoschemaTableSuite struct {
store kv.Storage
dom *domain.Domain
Expand All @@ -60,6 +66,11 @@ type testInfoschemaTableSerialSuite struct {
dom *domain.Domain
}

type inspectionSuite struct {
store kv.Storage
dom *domain.Domain
}

func (s *testInfoschemaTableSerialSuite) SetUpSuite(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -92,6 +103,62 @@ func (s *testInfoschemaTableSuite) TearDownSuite(c *C) {
s.store.Close()
}

func (s *inspectionSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *inspectionSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
}

func (s *inspectionSuite) TestInspectionTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
instances := []string{
"pd,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tidb,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
}
fpName := "github.com/pingcap/tidb/infoschema/mockClusterInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
c.Assert(failpoint.Enable(fpName, fpExpr), IsNil)
defer func() { c.Assert(failpoint.Disable(fpName), IsNil) }()

tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))

// enable inspection mode
inspectionTableCache := map[string]variable.TableSnapshot{}
tk.Se.GetSessionVars().InspectionTableCache = inspectionTableCache
tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
c.Assert(inspectionTableCache["cluster_info"].Err, IsNil)
c.Assert(len(inspectionTableCache["cluster_info"].Rows), DeepEquals, 3)

// check whether is obtain data from cache at the next time
inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd", mysql.DefaultCollationName)
tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"modified-pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
tk.Se.GetSessionVars().InspectionTableCache = nil
}

func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
25 changes: 10 additions & 15 deletions executor/inspection_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ func (e *inspectionResultRetriever) retrieve(ctx context.Context, sctx sessionct
// Some data of cluster-level memory tables will be retrieved many times in different inspection rules,
// and the cost of retrieving some data is expensive. We use the `TableSnapshot` to cache those data
// and obtain them lazily, and provide a consistent view of inspection tables for each inspection rules.
// All cached snapshots should be released at the end of retrieving. So all diagnosis rules should query
// `cluster_config/cluster_hardware/cluster_load/cluster_info` in `inspection_schema`.
// e.g:
// SELECT * FROM inspection_schema.cluster_config
// instead of:
// SELECT * FROM information_schema.cluster_config
// All cached snapshots should be released at the end of retrieving.
sctx.GetSessionVars().InspectionTableCache = map[string]variable.TableSnapshot{}
defer func() { sctx.GetSessionVars().InspectionTableCache = nil }()

Expand Down Expand Up @@ -206,7 +201,7 @@ func (configInspection) inspectDiffConfig(_ context.Context, sctx sessionctx.Con
"raftstore.raftdb-path",
"storage.data-dir",
}
sql := fmt.Sprintf("select type, `key`, count(distinct value) as c from inspection_schema.cluster_config where `key` not in ('%s') group by type, `key` having c > 1",
sql := fmt.Sprintf("select type, `key`, count(distinct value) as c from information_schema.cluster_config where `key` not in ('%s') group by type, `key` having c > 1",
strings.Join(ignoreConfigKey, "','"))
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand Down Expand Up @@ -253,7 +248,7 @@ func (configInspection) inspectCheckConfig(_ context.Context, sctx sessionctx.Co
if !filter.enable(cas.key) {
continue
}
sql := fmt.Sprintf("select instance from inspection_schema.cluster_config where type = '%s' and `key` = '%s' and value = '%s'",
sql := fmt.Sprintf("select instance from information_schema.cluster_config where type = '%s' and `key` = '%s' and value = '%s'",
cas.tp, cas.key, cas.value)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand All @@ -277,7 +272,7 @@ func (configInspection) inspectCheckConfig(_ context.Context, sctx sessionctx.Co

func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, count(distinct git_hash) as c from inspection_schema.cluster_info group by type having c > 1;"
sql := "select type, count(distinct git_hash) as c from information_schema.cluster_info group by type having c > 1;"
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("check version consistency failed: %v", err))
Expand Down Expand Up @@ -332,19 +327,19 @@ func (c currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Contex
}{
{
"virtual-memory-usage",
"select type, instance, value from inspection_schema.cluster_load where device_type='memory' and device_name='virtual' and name='used-percent' and value > 0.7",
"select type, instance, value from information_schema.cluster_load where device_type='memory' and device_name='virtual' and name='used-percent' and value > 0.7",
"< 0.7",
commonResult,
},
{
"swap-memory-usage",
"select type, instance, value from inspection_schema.cluster_load where device_type='memory' and device_name='swap' and name='used-percent' and value > 0",
"select type, instance, value from information_schema.cluster_load where device_type='memory' and device_name='swap' and name='used-percent' and value > 0",
"0",
commonResult,
},
{
"disk-usage",
"select type, instance, device_name, value from inspection_schema.cluster_hardware where device_type='disk' and name='used-percent' and value > 70",
"select type, instance, device_name, value from information_schema.cluster_hardware where device_type='disk' and name='used-percent' and value > 70",
"< 70",
diskResult,
},
Expand Down Expand Up @@ -374,8 +369,8 @@ func (currentLoadInspection) inspectCPULoad(sctx sessionctx.Context, filter insp
continue
}
sql := fmt.Sprintf(`select t1.*, 0.7 * t2.cpu_core from
(select type, instance, value from inspection_schema.cluster_load where device_type='cpu' and device_name='cpu' and name='%s') as t1 join
(select type,instance, max(value) as cpu_core from inspection_schema.CLUSTER_HARDWARE where DEVICE_TYPE='cpu' and name='cpu-logical-cores' group by type,instance) as t2
(select type, instance, value from information_schema.cluster_load where device_type='cpu' and device_name='cpu' and name='%s') as t1 join
(select type,instance, max(value) as cpu_core from information_schema.CLUSTER_HARDWARE where DEVICE_TYPE='cpu' and name='cpu-logical-cores' group by type,instance) as t2
where t2.instance = t1.instance and t1.type=t2.type and t1.value > 0.7 * t2.cpu_core;`, item)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand Down Expand Up @@ -562,7 +557,7 @@ func (thresholdCheckInspection) inspectThreshold1(ctx context.Context, sctx sess
if len(rule.configKey) > 0 {
sql = fmt.Sprintf("select t1.instance, t1.cpu, t2.threshold, t2.value from "+
"(select instance, max(value) as cpu from metrics_schema.tikv_thread_cpu %[4]s and name like '%[1]s' group by instance) as t1,"+
"(select value * %[2]f as threshold, value from inspection_schema.cluster_config where type='tikv' and `key` = '%[3]s' limit 1) as t2 "+
"(select value * %[2]f as threshold, value from information_schema.cluster_config where type='tikv' and `key` = '%[3]s' limit 1) as t2 "+
"where t1.cpu > t2.threshold;", rule.component, rule.threshold, rule.configKey, condition)
} else {
sql = fmt.Sprintf("select t1.instance, t1.cpu, %[2]f from "+
Expand Down
44 changes: 43 additions & 1 deletion executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/sysutil"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -61,12 +63,52 @@ type memTableRetriever interface {
// MemTableReaderExec executes memTable information retrieving from the MemTable components
type MemTableReaderExec struct {
baseExecutor
table *model.TableInfo
retriever memTableRetriever
// cacheRetrieved is used to indicate whether has the parent executor retrieved
// from inspection cache in inspection mode.
cacheRetrieved bool
}

func (e *MemTableReaderExec) isInspectionCacheableTable(tblName string) bool {
switch tblName {
case strings.ToLower(infoschema.TableClusterConfig),
strings.ToLower(infoschema.TableClusterInfo),
strings.ToLower(infoschema.TableClusterSystemInfo),
strings.ToLower(infoschema.TableClusterLoad),
strings.ToLower(infoschema.TableClusterHardware):
return true
default:
return false
}
}

// Next implements the Executor Next interface.
func (e *MemTableReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
rows, err := e.retriever.retrieve(ctx, e.ctx)
var (
rows [][]types.Datum
err error
)

// The `InspectionTableCache` will be assigned in the begin of retrieving` and be
// cleaned at the end of retrieving, so nil represents currently in non-inspection mode.
if cache, tbl := e.ctx.GetSessionVars().InspectionTableCache, e.table.Name.L; cache != nil &&
e.isInspectionCacheableTable(tbl) {
// TODO: cached rows will be returned fully, we should refactor this part.
if !e.cacheRetrieved {
// Obtain data from cache first.
cached, found := cache[tbl]
if !found {
rows, err := e.retriever.retrieve(ctx, e.ctx)
cached = variable.TableSnapshot{Rows: rows, Err: err}
cache[tbl] = cached
}
e.cacheRetrieved = true
rows, err = cached.Rows, cached.Err
}
} else {
rows, err = e.retriever.retrieve(ctx, e.ctx)
}
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (*testSuite) TestT(c *C) {
is := handle.Get()

schemaNames := is.AllSchemaNames()
c.Assert(schemaNames, HasLen, 5)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test", util.InspectionSchemaName.O}), IsTrue)
c.Assert(schemaNames, HasLen, 4)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test"}), IsTrue)

schemas := is.AllSchemas()
c.Assert(schemas, HasLen, 5)
c.Assert(schemas, HasLen, 4)
schemas = is.Clone()
c.Assert(schemas, HasLen, 5)
c.Assert(schemas, HasLen, 4)

c.Assert(is.SchemaExists(dbName), IsTrue)
c.Assert(is.SchemaExists(noexist), IsFalse)
Expand Down
Loading