Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemanger: add cpu scheduler #39886

Merged
merged 32 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
14901ca
improve code
hawkingrei Dec 16, 2022
c528b13
*: fix revive false positive with generics
hawkingrei Dec 6, 2022
b354171
*: fix revive false positive with generics
hawkingrei Dec 6, 2022
2470c69
*: fix revive false positive with generics
hawkingrei Dec 6, 2022
6b3ff7b
update
hawkingrei Dec 6, 2022
bd60251
manage -> manager
hawkingrei Dec 6, 2022
6787d94
update
hawkingrei Dec 6, 2022
ca126e8
*: upgrade manage
hawkingrei Dec 7, 2022
e1ce5fb
use RWMutex
hawkingrei Dec 8, 2022
8c5287d
update commit
hawkingrei Dec 8, 2022
e9144c5
update commit
hawkingrei Dec 8, 2022
ce63425
refactor
hawkingrei Dec 9, 2022
279f27c
improve code
hawkingrei Dec 12, 2022
3ec7dc4
add comments
hawkingrei Dec 12, 2022
e7a8496
add comments
hawkingrei Dec 12, 2022
7e2e1bd
update test
hawkingrei Dec 12, 2022
2cfff72
minCPUSchedulerInterval use atomic value
hawkingrei Dec 12, 2022
681677b
minCPUSchedulerInterval use atomic value
hawkingrei Dec 12, 2022
1548c33
*: cpu resource manager
hawkingrei Dec 13, 2022
32a4f05
*: cpu resource manager
hawkingrei Dec 13, 2022
986a6a1
resourcemanger: add cpu scheduler
hawkingrei Dec 13, 2022
14cb418
resourcemanger: add cpu scheduler
hawkingrei Dec 13, 2022
1c873dd
resourcemanger: add cpu scheduler
hawkingrei Dec 13, 2022
4f1504b
resourcemanger: add cpu scheduler
hawkingrei Dec 13, 2022
8de0b4e
expression: close recordset
hawkingrei Dec 14, 2022
3588a50
*: add test
hawkingrei Jan 3, 2023
a509c0d
*: add test
hawkingrei Jan 3, 2023
d35f3e2
update bazel
hawkingrei Jan 3, 2023
da11763
update bazel
hawkingrei Jan 3, 2023
1abbfb9
bazel update
hawkingrei Jan 4, 2023
ce0cfec
update
hawkingrei Jan 4, 2023
1a4c7d6
Merge branch 'master' into add_resource_manage_2
ti-chi-bot Jan 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions resourcemanager/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "resourcemanage",
srcs = ["rm.go"],
name = "resourcemanager",
srcs = [
"rm.go",
"schedule.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/scheduler",
"//resourcemanager/util",
"//util",
"//util/cpu",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)
35 changes: 35 additions & 0 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package resourcemanager

import (
"time"

"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpu"
)
Expand All @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger()

// ResourceManager is a resource manager
type ResourceManager struct {
poolMap *util.ShardPoolMap
scheduler []scheduler.Scheduler
cpuObserver *cpu.Observer
exitCh chan struct{}
wg tidbutil.WaitGroupWrapper
}

// NewResourceManger is to create a new resource manager
func NewResourceManger() *ResourceManager {
sc := make([]scheduler.Scheduler, 0, 1)
sc = append(sc, scheduler.NewCPUScheduler())
return &ResourceManager{
cpuObserver: cpu.NewCPUObserver(),
exitCh: make(chan struct{}),
poolMap: util.NewShardPoolMap(),
scheduler: sc,
}
}

// Start is to start resource manager
func (r *ResourceManager) Start() {
r.wg.Run(r.cpuObserver.Start)
r.wg.Run(func() {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
r.schedule()
case <-r.exitCh:
return
}
}
})
}

// Stop is to stop resource manager
func (r *ResourceManager) Stop() {
r.cpuObserver.Stop()
close(r.exitCh)
r.wg.Wait()
}

// Register is to register pool into resource manager
func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error {
p := util.PoolContainer{Pool: pool, Component: component}
return r.registerPool(name, &p)
}

func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error {
return r.poolMap.Add(name, pool)
}
69 changes: 69 additions & 0 deletions resourcemanager/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcemanager

import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/zap"
)

func (r *ResourceManager) schedule() {
r.poolMap.Iter(func(pool *util.PoolContainer) {
cmd := r.schedulePool(pool)
r.exec(pool, cmd)
})
}

func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command {
for _, sch := range r.scheduler {
cmd := sch.Tune(pool.Component, pool.Pool)
switch cmd {
case scheduler.Hold:
continue
default:
return cmd
}
}
return scheduler.Hold
}

func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) {
if cmd == scheduler.Hold {
return
}
if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond {
con := pool.Pool.Cap()
switch cmd {
case scheduler.Downclock:
concurrency := con - 1
log.Info("downclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
pool.Pool.Tune(concurrency)
case scheduler.Overclock:
concurrency := con + 1
log.Info("overclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
pool.Pool.Tune(concurrency)
}
}
}
16 changes: 16 additions & 0 deletions resourcemanager/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "scheduler",
srcs = [
"cpu_scheduler.go",
"scheduler.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/scheduler",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/util",
"//util/cpu",
"@org_uber_go_atomic//:atomic",
],
)
44 changes: 44 additions & 0 deletions resourcemanager/scheduler/cpu_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/cpu"
)

// CPUScheduler is a cpu scheduler
type CPUScheduler struct{}

// NewCPUScheduler is to create a new cpu scheduler
func NewCPUScheduler() *CPUScheduler {
return &CPUScheduler{}
}

// Tune is to tune the goroutine pool
func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() {
return Hold
}
if cpu.GetCPUUsage() < 0.5 {
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
return Downclock
}
if cpu.GetCPUUsage() > 0.7 {
return Overclock
}
return Hold
}
43 changes: 43 additions & 0 deletions resourcemanager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/atomic"
)

var (
minCPUSchedulerInterval = atomic.NewDuration(time.Minute)
)

// Command is the command for scheduler
type Command int

const (
// Downclock is to reduce the number of concurrency.
Downclock Command = iota
// Hold is to hold the number of concurrency.
Hold
// Overclock is to increase the number of concurrency.
Overclock
)

// Scheduler is a scheduler interface
type Scheduler interface {
Tune(component util.Component, p util.GorotinuePool) Command
}
20 changes: 20 additions & 0 deletions resourcemanager/util/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "util",
srcs = [
"mock_gpool.go",
"shard_pool_map.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/util",
visibility = ["//visibility:public"],
deps = ["@com_github_pingcap_errors//:errors"],
)

go_test(
name = "util_test",
srcs = ["shard_pool_map_test.go"],
embed = [":util"],
deps = ["@com_github_stretchr_testify//require"],
)
97 changes: 97 additions & 0 deletions resourcemanager/util/mock_gpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import "time"

// MockGPool is only for test
type MockGPool struct {
name string
}

// NewMockGPool is only for test
func NewMockGPool(name string) *MockGPool {
return &MockGPool{name: name}
}

// Release is only for test
func (*MockGPool) Release() {
panic("implement me")
}

// Tune is only for test
func (*MockGPool) Tune(_ int) {
panic("implement me")
}

// LastTunerTs is only for test
func (*MockGPool) LastTunerTs() time.Time {
panic("implement me")
}

// MaxInFlight is only for test
func (*MockGPool) MaxInFlight() int64 {
panic("implement me")
}

// InFlight is only for test
func (*MockGPool) InFlight() int64 {
panic("implement me")
}

// MinRT is only for test
func (*MockGPool) MinRT() uint64 {
panic("implement me")
}

// MaxPASS is only for test
func (*MockGPool) MaxPASS() uint64 {
panic("implement me")
}

// Cap is only for test
func (*MockGPool) Cap() int {
panic("implement me")
}

// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT.
func (*MockGPool) LongRTT() float64 {
panic("implement me")
}

// UpdateLongRTT is only for test
func (*MockGPool) UpdateLongRTT(_ func(float64) float64) {
panic("implement me")
}

// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT.
func (*MockGPool) ShortRTT() uint64 {
panic("implement me")
}

// GetQueueSize is only for test
func (*MockGPool) GetQueueSize() int64 {
panic("implement me")
}

// Running is only for test
func (*MockGPool) Running() int {
panic("implement me")
}

// Name is only for test
func (m *MockGPool) Name() string {
return m.name
}
Loading