Skip to content

Commit

Permalink
metrics: add tidb_rm_pool_concurrency (#41113)
Browse files Browse the repository at this point in the history
close #41114
  • Loading branch information
hawkingrei authored Feb 7, 2023
1 parent c996d64 commit b2cfcca
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=

test --test_env=TZ=Asia/Shanghai
test --test_output=errors --test_summary=testcase
test:ci --color=yes
test:ci --color=yes --spawn_strategy=local
test:ci --verbose_failures --test_verbose_timeout_warnings
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --experimental_ui_max_stdouterr_bytes=104857600
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLPhaseTime)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)

prometheus.MustRegister(HistoricalStatsCounter)
prometheus.MustRegister(PlanReplayerTaskCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ var (
Name: "ema_cpu_usage",
Help: "exponential moving average of CPU usage",
})
// PoolConcurrencyCounter means how much concurrency in the pool
PoolConcurrencyCounter = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "rm",
Name: "pool_concurrency",
Help: "How many concurrency in the pool",
}, []string{LblType})
)
2 changes: 2 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ go_library(
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//metrics",
"//resourcemanager",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
32 changes: 19 additions & 13 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/pooltask"
"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand All @@ -47,13 +49,14 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct {
workers *loopQueue[T, U, C, CT, TF]
options *Options
gpool.BasePool
taskManager pooltask.TaskManager[T, U, C, CT, TF]
waitingTask atomicutil.Uint32
capacity atomic.Int32
running atomic.Int32
state atomic.Int32
waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available.
heartbeatDone atomic.Bool
taskManager pooltask.TaskManager[T, U, C, CT, TF]
waitingTask atomicutil.Uint32
capacity atomic.Int32
running atomic.Int32
state atomic.Int32
waiting atomic.Int32 // waiting is the number of goroutines that are waiting for the pool to be available.
heartbeatDone atomic.Bool
concurrencyMetrics prometheus.Gauge
}

// NewSPMCPool create a single producer, multiple consumer goroutine pool.
Expand All @@ -63,12 +66,13 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
opts.ExpiryDuration = gpool.DefaultCleanIntervalTime
}
result := &Pool[T, U, C, CT, TF]{
BasePool: gpool.NewBasePool(),
taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128),
stopCh: make(chan struct{}),
lock: gpool.NewSpinLock(),
taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size),
options: opts,
BasePool: gpool.NewBasePool(),
taskCh: make(chan *pooltask.TaskBox[T, U, C, CT, TF], 128),
stopCh: make(chan struct{}),
lock: gpool.NewSpinLock(),
taskManager: pooltask.NewTaskManager[T, U, C, CT, TF](size),
concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name),
options: opts,
}
result.SetName(name)
result.state.Store(int32(gpool.OPENED))
Expand All @@ -78,6 +82,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
}
}
result.capacity.Add(size)
result.concurrencyMetrics.Set(float64(size))
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.InstanceResourceManager.Register(result, name, component)
Expand Down Expand Up @@ -141,6 +146,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
p.concurrencyMetrics.Set(float64(size))
if size > capacity {
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(size); boostTask != nil {
Expand Down

0 comments on commit b2cfcca

Please sign in to comment.