Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: specify time range via TIME_RANGE hint for metrics/inspection tables #14874

Merged
merged 11 commits into from
Feb 21, 2020
12 changes: 8 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &inspectionResultRetriever{
extractor: v.Extractor.(*plannercore.InspectionResultTableExtractor),
timeRange: v.QueryTimeRange,
},
}
case strings.ToLower(infoschema.TableInspectionSummary):
Expand All @@ -1327,22 +1328,25 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
retriever: &inspectionSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.InspectionSummaryTableExtractor),
timeRange: v.QueryTimeRange,
},
}
case strings.ToLower(infoschema.TableMetricSummary):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &MetricSummaryRetriever{
retriever: &MetricsSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
timeRange: v.QueryTimeRange,
},
}
case strings.ToLower(infoschema.TableMetricSummaryByLabel):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &MetricSummaryByLabelRetriever{
retriever: &MetricsSummaryByLabelRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
timeRange: v.QueryTimeRange,
},
}
case strings.ToLower(infoschema.TableSchemata), strings.ToLower(infoschema.TableViews):
Expand Down
4 changes: 2 additions & 2 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (s *testSuite) TestIssue11124(c *C) {

func (s *testSuite) TestExplainMetricTable(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustQuery(fmt.Sprintf("desc select * from METRIC_SCHEMA.tidb_query_duration where time >= '2019-12-23 16:10:13' and time <= '2019-12-23 16:30:13' ")).Check(testkit.Rows(
tk.MustQuery(fmt.Sprintf("desc select * from METRICS_SCHEMA.tidb_query_duration where time >= '2019-12-23 16:10:13' and time <= '2019-12-23 16:30:13' ")).Check(testkit.Rows(
"MemTableScan_5 10000.00 root PromQL:histogram_quantile(0.9, sum(rate(tidb_server_handle_query_duration_seconds_bucket{}[60s])) by (le,sql_type,instance)), start_time:2019-12-23 16:10:13, end_time:2019-12-23 16:30:13, step:1m0s"))
tk.MustQuery(fmt.Sprintf("desc select * from METRIC_SCHEMA.up where time >= '2019-12-23 16:10:13' and time <= '2019-12-23 16:30:13' ")).Check(testkit.Rows(
tk.MustQuery(fmt.Sprintf("desc select * from METRICS_SCHEMA.up where time >= '2019-12-23 16:10:13' and time <= '2019-12-23 16:30:13' ")).Check(testkit.Rows(
"MemTableScan_5 10000.00 root PromQL:up{}, start_time:2019-12-23 16:10:13, end_time:2019-12-23 16:30:13, step:1m0s"))
}
73 changes: 45 additions & 28 deletions executor/inspection_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ type (

inspectionName string

inspectionFilter struct{ set.StringSet }
inspectionFilter struct {
set set.StringSet
timeRange plannercore.QueryTimeRange
}

inspectionRule interface {
name() string
Expand All @@ -60,7 +63,11 @@ func (n inspectionName) name() string {
}

func (f inspectionFilter) enable(name string) bool {
return len(f.StringSet) == 0 || f.Exist(name)
return len(f.set) == 0 || f.set.Exist(name)
}

func (f inspectionFilter) exist(name string) bool {
return len(f.set) > 0 && f.set.Exist(name)
}

type (
Expand Down Expand Up @@ -96,6 +103,7 @@ type inspectionResultRetriever struct {
dummyCloser
retrieved bool
extractor *plannercore.InspectionResultTableExtractor
timeRange plannercore.QueryTimeRange
}

func (e *inspectionResultRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
Expand Down Expand Up @@ -126,8 +134,8 @@ func (e *inspectionResultRetriever) retrieve(ctx context.Context, sctx sessionct
}
})

rules := inspectionFilter{e.extractor.Rules}
items := inspectionFilter{e.extractor.Items}
rules := inspectionFilter{set: e.extractor.Rules}
items := inspectionFilter{set: e.extractor.Items, timeRange: e.timeRange}
var finalRows [][]types.Datum
for _, r := range inspectionRules {
name := r.name()
Expand Down Expand Up @@ -297,7 +305,6 @@ func (currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Context,
}

func (criticalErrorInspection) inspect(ctx context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// TODO: specify the `begin` and `end` time of metric query
var rules = []struct {
tp string
item string
Expand All @@ -319,16 +326,17 @@ func (criticalErrorInspection) inspect(ctx context.Context, sctx sessionctx.Cont
{tp: "tikv", item: "grpc-errors", tbl: "tikv_grpc_errors"},
}

condition := filter.timeRange.Condition()
var results []inspectionResult
for _, rule := range rules {
if filter.enable(rule.item) {
def, ok := infoschema.MetricTableMap[rule.tbl]
if !ok {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("metrics table: %s not fouund", rule.tbl))
def, found := infoschema.MetricTableMap[rule.tbl]
if !found {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("metrics table: %s not found", rule.tbl))
continue
}
sql := fmt.Sprintf("select `%[1]s`, max(value) as max_value from `%[2]s`.`%[3]s` group by `%[1]s` having max_value > 0.0",
strings.Join(def.Labels, "`,`"), util.MetricSchemaName.L, rule.tbl)
sql := fmt.Sprintf("select `%[1]s`,max(value) as max from `%[2]s`.`%[3]s` %[4]s group by `%[1]s` having max>0.0",
strings.Join(def.Labels, "`,`"), util.MetricSchemaName.L, rule.tbl, condition)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQLWithContext(ctx, sql)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("execute '%s' failed: %v", sql, err))
Expand All @@ -337,19 +345,19 @@ func (criticalErrorInspection) inspect(ctx context.Context, sctx sessionctx.Cont
for _, row := range rows {
var actual, detail string
if rest := def.Labels[1:]; len(rest) > 0 {
pairs := make([]string, 0, len(rest))
values := make([]string, 0, len(rest))
// `i+1` and `1+len(rest)` means skip the first field `instance`
for i, label := range rest {
pairs = append(pairs, fmt.Sprintf("`%s`='%s'", label, row.GetString(i+1)))
for i := range rest {
values = append(values, row.GetString(i+1))
}
// TODO: find a better way to construct the `actual` field
actual = fmt.Sprintf("{%s}=%.2f", strings.Join(pairs, ","), row.GetFloat64(1+len(rest)))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s' and %s",
util.MetricSchemaName.L, rule.tbl, row.GetString(0), strings.Join(pairs, " and "))
actual = fmt.Sprintf("%.2f(%s)", row.GetFloat64(1+len(rest)), strings.Join(values, ", "))
detail = fmt.Sprintf("select * from `%s`.`%s` %s and (`instance`,`%s`)=('%s','%s')",
util.MetricSchemaName.L, rule.tbl, condition, strings.Join(rest, "`,`"), row.GetString(0), strings.Join(values, "','"))
} else {
actual = fmt.Sprintf("%.2f", row.GetFloat64(1))
detail = fmt.Sprintf("select * from `%s`.`%s` where `instance`='%s'",
util.MetricSchemaName.L, rule.tbl, row.GetString(0))
detail = fmt.Sprintf("select * from `%s`.`%s` %s and `instance`='%s'",
util.MetricSchemaName.L, rule.tbl, condition, row.GetString(0))
}
result := inspectionResult{
tp: rule.tp,
Expand Down Expand Up @@ -453,18 +461,24 @@ func (thresholdCheckInspection) inspectThreshold1(ctx context.Context, sctx sess
threshold: 0.9,
},
}

condition := filter.timeRange.Condition()
var results []inspectionResult
for _, rule := range rules {
if !filter.enable(rule.item) {
continue
}

var sql string
if len(rule.configKey) > 0 {
sql = fmt.Sprintf("select t1.instance, t1.cpu, t2.threshold, t2.value from "+
"(select instance, sum(value) as cpu from metric_schema.tikv_thread_cpu where name like '%[1]s' and time=now() group by instance) as t1,"+
"(select instance, sum(value) as cpu from metrics_schema.tikv_thread_cpu %[4]s and name like '%[1]s' group by instance) as t1,"+
"(select value * %[2]f as threshold, value from inspection_schema.cluster_config where type='tikv' and `key` = '%[3]s' limit 1) as t2 "+
"where t1.cpu > t2.threshold;", rule.component, rule.threshold, rule.configKey)
"where t1.cpu > t2.threshold;", rule.component, rule.threshold, rule.configKey, condition)
} else {
sql = fmt.Sprintf("select t1.instance, t1.cpu, %[2]f from "+
"(select instance, sum(value) as cpu from metric_schema.tikv_thread_cpu where name like '%[1]s' and time=now() group by instance) as t1 "+
"where t1.cpu > %[2]f;", rule.component, rule.threshold)
"(select instance, sum(value) as cpu from metrics_schema.tikv_thread_cpu %[3]s and name like '%[1]s' group by instance) as t1 "+
"where t1.cpu > %[2]f;", rule.component, rule.threshold, condition)
}
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQLWithContext(ctx, sql)
if err != nil {
Expand All @@ -479,7 +493,8 @@ func (thresholdCheckInspection) inspectThreshold1(ctx context.Context, sctx sess
} else {
expected = fmt.Sprintf("< %.2f", row.GetFloat64(2))
}
detail := fmt.Sprintf("select instance, sum(value) as cpu from metric_schema.tikv_thread_cpu where name like '%[1]s' and time=now() group by instance", rule.component)
detail := fmt.Sprintf("select instance, sum(value) as cpu from metrics_schema.tikv_thread_cpu %s and name like '%s' group by instance",
condition, rule.component)
result := inspectionResult{
tp: "tikv",
instance: row.GetString(0),
Expand Down Expand Up @@ -604,20 +619,22 @@ func (thresholdCheckInspection) inspectThreshold2(ctx context.Context, sctx sess
isMin: true,
},
}

condition := filter.timeRange.Condition()
var results []inspectionResult
for _, rule := range rules {
if !filter.enable(rule.item) {
continue
}
var sql string
condition := rule.condition
if len(condition) > 0 {
condition = "where " + condition
cond := condition
if len(rule.condition) > 0 {
cond = fmt.Sprintf("%s and %s", cond, rule.condition)
}
if rule.isMin {
sql = fmt.Sprintf("select instance, min(value) as min_value from metric_schema.%s %s group by instance having min_value < %f;", rule.tbl, condition, rule.threshold)
sql = fmt.Sprintf("select instance, min(value) as min_value from metrics_schema.%s %s group by instance having min_value < %f;", rule.tbl, cond, rule.threshold)
} else {
sql = fmt.Sprintf("select instance, max(value) as max_value from metric_schema.%s %s group by instance having max_value > %f;", rule.tbl, condition, rule.threshold)
sql = fmt.Sprintf("select instance, max(value) as max_value from metrics_schema.%s %s group by instance having max_value > %f;", rule.tbl, cond, rule.threshold)
}
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQLWithContext(ctx, sql)
if err != nil {
Expand Down
Loading