Skip to content

Commit

Permalink
*: adaptively control analyze distsql concurrency (#53261)
Browse files Browse the repository at this point in the history
close #53262
  • Loading branch information
hawkingrei authored May 21, 2024
1 parent 53bf76f commit def7c23
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*mod
_, offset := timeutil.Zone(e.ctx.GetSessionVars().Location())
tasks := make([]*analyzeTask, 0, len(indexInfos))
sc := e.ctx.GetSessionVars().StmtCtx
concurrency := e.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), e.ctx)
for _, indexInfo := range indexInfos {
base := baseAnalyzeExec{
ctx: e.ctx,
Expand Down
39 changes: 39 additions & 0 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,50 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/atomic"
"go.uber.org/zap"
)

func adaptiveAnlayzeDistSQLConcurrency(ctx context.Context, sctx sessionctx.Context) int {
concurrency := sctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
if concurrency > 0 {
return concurrency
}
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
logutil.BgLogger().Warn("Information about TiKV store status can be gotten only when the storage is TiKV")
return variable.DefAnalyzeDistSQLScanConcurrency
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
pdCli, err := tikvHelper.TryGetPDHTTPClient()
if err != nil {
logutil.BgLogger().Warn("fail to TryGetPDHTTPClient", zap.Error(err))
return variable.DefAnalyzeDistSQLScanConcurrency
}
storesStat, err := pdCli.GetStores(ctx)
if err != nil {
logutil.BgLogger().Warn("fail to get stores info", zap.Error(err))
return variable.DefAnalyzeDistSQLScanConcurrency
}
if storesStat.Count <= 5 {
return variable.DefAnalyzeDistSQLScanConcurrency
} else if storesStat.Count <= 10 {
return storesStat.Count
} else if storesStat.Count <= 20 {
return storesStat.Count * 2
} else if storesStat.Count <= 50 {
return storesStat.Count * 3
}
return storesStat.Count * 4
}

func getIntFromSessionVars(ctx sessionctx.Context, name string) (int, error) {
sessionVars := ctx.GetSessionVars()
concurrency, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), name)
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2492,7 +2492,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde
failpoint.Inject("injectAnalyzeSnapshot", func(val failpoint.Value) {
startTS = uint64(val.(int))
})
concurrency := b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), b.ctx)
base := baseAnalyzeExec{
ctx: b.ctx,
tableID: task.TableID,
Expand Down Expand Up @@ -2609,7 +2609,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(
PartitionName: task.PartitionName,
SampleRateReason: sampleRateReason,
}
concurrency := b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), b.ctx)
base := baseAnalyzeExec{
ctx: b.ctx,
tableID: task.TableID,
Expand Down Expand Up @@ -2743,7 +2743,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(
failpoint.Inject("injectAnalyzeSnapshot", func(val failpoint.Value) {
startTS = uint64(val.(int))
})
concurrency := b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), b.ctx)
base := baseAnalyzeExec{
ctx: b.ctx,
tableID: task.TableID,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ var defaultSysVars = []*SysVar{
return BoolToOnOff(EnableTmpStorageOnOOM.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBAutoBuildStatsConcurrency, Value: strconv.Itoa(DefTiDBAutoBuildStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency},
{Scope: ScopeGlobal, Name: TiDBSysProcScanConcurrency, Value: strconv.Itoa(DefTiDBSysProcScanConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32},
{Scope: ScopeGlobal, Name: TiDBSysProcScanConcurrency, Value: strconv.Itoa(DefTiDBSysProcScanConcurrency), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32},
{Scope: ScopeGlobal, Name: TiDBMemoryUsageAlarmRatio, Value: strconv.FormatFloat(DefMemoryUsageAlarmRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0.0, MaxValue: 1.0, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
MemoryUsageAlarmRatio.Store(tidbOptFloat64(val, DefMemoryUsageAlarmRatio))
return nil
Expand Down Expand Up @@ -1821,7 +1821,7 @@ var defaultSysVars = []*SysVar{
s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeDistSQLScanConcurrency, Value: strconv.Itoa(DefAnalyzeDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeDistSQLScanConcurrency, Value: strconv.Itoa(DefAnalyzeDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
s.analyzeDistSQLScanConcurrency = tidbOptPositiveInt32(val, DefAnalyzeDistSQLScanConcurrency)
return nil
}},
Expand Down

0 comments on commit def7c23

Please sign in to comment.