Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resouce_manager: impl resource controller for tokens client #5811

Merged
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
75190c9
impl resouce manager tenant side client
CabinfeverB Dec 29, 2022
0e3c929
Merge branch 'master' into resource-manager/tenant_side_client_1
CabinfeverB Jan 9, 2023
cf6b0d0
Introduce the RU coefficient config and refine some code
JmPotato Jan 11, 2023
1a6271d
Refine the code
JmPotato Jan 11, 2023
49a78a8
Merge branch 'master' into resource-manager/tenant_side_client_1
nolouch Jan 11, 2023
d1a17b1
refactor
CabinfeverB Jan 12, 2023
14228c1
refactor
CabinfeverB Jan 12, 2023
4ffccbc
merge
CabinfeverB Jan 12, 2023
9141699
merge
CabinfeverB Jan 12, 2023
395a01c
refactor limiter
CabinfeverB Jan 12, 2023
c31b6f7
fix bug
CabinfeverB Jan 15, 2023
f1bc87b
add limtier
CabinfeverB Jan 15, 2023
d74e600
address comment
CabinfeverB Jan 15, 2023
d68fd7f
remove useless code
CabinfeverB Jan 16, 2023
4b26a4b
merge limiter
CabinfeverB Jan 16, 2023
ac83f2c
address comment
CabinfeverB Jan 17, 2023
57d526f
address comment and add test
CabinfeverB Jan 17, 2023
0c46eaf
address comment and add test
CabinfeverB Jan 17, 2023
15a3cdb
merge master
CabinfeverB Jan 17, 2023
67d7962
merge master
CabinfeverB Jan 17, 2023
122c52b
fix test
CabinfeverB Jan 17, 2023
10542e7
merge master
CabinfeverB Jan 17, 2023
3b30333
merge master
CabinfeverB Jan 17, 2023
aeb133e
Merge branch 'master' into resource-manager/tenant_side_client_1
CabinfeverB Jan 17, 2023
55fedf5
merge master
CabinfeverB Jan 17, 2023
ed807b9
fix static check
CabinfeverB Jan 18, 2023
5e64c41
add test
CabinfeverB Jan 18, 2023
b4f1e57
fit loan
CabinfeverB Jan 18, 2023
a690b8e
add retry
CabinfeverB Jan 18, 2023
0dd78ff
add test
CabinfeverB Jan 18, 2023
8efd2e0
Merge branch 'resource-manager/server_fix_2' into resource-manager/te…
CabinfeverB Jan 18, 2023
606d6ba
address comment
CabinfeverB Jan 19, 2023
166909f
address comment
CabinfeverB Jan 19, 2023
07b4d67
address comment
CabinfeverB Jan 19, 2023
d711d4e
address comment
CabinfeverB Jan 19, 2023
e83f208
address comment
CabinfeverB Jan 19, 2023
c1b42ee
address comment
CabinfeverB Jan 19, 2023
d1d56d1
address comment
CabinfeverB Jan 19, 2023
16956b6
merge master
CabinfeverB Jan 19, 2023
37abfa8
address comment
CabinfeverB Jan 19, 2023
91eb7b3
Merge branch 'master' into resource-manager/tenant_side_client_1
ti-chi-bot Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
790 changes: 790 additions & 0 deletions pkg/mcs/resource_manager/client/client.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions pkg/mcs/resource_manager/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,42 @@

package client

import (
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
)

var (
requestUnitList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{
rmpb.RequestUnitType_RRU: {},
rmpb.RequestUnitType_WRU: {},
}
requestResourceList map[rmpb.RawResourceType]struct{} = map[rmpb.RawResourceType]struct{}{
rmpb.RawResourceType_IOReadFlow: {},
rmpb.RawResourceType_IOWriteFlow: {},
rmpb.RawResourceType_CPU: {},
}
)

const (
initialRequestUnits = 10000
bufferRUs = 2000
// movingAvgFactor is the weight applied to a new "sample" of RU usage (with one
// sample per mainLoopUpdateInterval).
//
// If we want a factor of 0.5 per second, this should be:
//
// 0.5^(1 second / mainLoopUpdateInterval)
movingAvgFactor = 0.5
notifyFraction = 0.1
consumptionsReportingThreshold = 100
extendedReportingPeriodFactor = 4
defaultGroupLoopUpdateInterval = 1 * time.Second
defaultTargetPeriod = 10 * time.Second
defaultMaxRequestTokens = 1e8
)

const (
defaultReadBaseCost = 1
defaultReadCostPerByte = 1. / 1024 / 1024
Expand Down Expand Up @@ -55,6 +91,10 @@ func DefaultRequestUnitConfig() *RequestUnitConfig {
// units or request resource cost standards. It should be calculated by a given `RequestUnitConfig`
// or `RequestResourceConfig`.
type Config struct {
groupLoopUpdateInterval time.Duration
targetPeriod time.Duration
maxRequestTokens float64

ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
ReadCPUMsCost RequestUnit
Expand All @@ -79,5 +119,8 @@ func generateConfig(ruConfig *RequestUnitConfig) *Config {
WriteBaseCost: RequestUnit(ruConfig.WriteBaseCost),
WriteBytesCost: RequestUnit(ruConfig.WriteCostPerByte),
}
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
cfg.maxRequestTokens = defaultMaxRequestTokens
return cfg
}
20 changes: 14 additions & 6 deletions pkg/mcs/resource_manager/client/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Reservation) CancelAt(now time.Time) {
// Act()
//
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Reservation {
func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation {
// Check if ctx is already cancelled
select {
case <-ctx.Done():
Expand All @@ -184,7 +184,7 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese
default:
}
// Determine wait limit
waitLimit := InfDuration
waitLimit := waitDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
Expand All @@ -194,6 +194,8 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese

// SetupNotificationThreshold enables the notification at the given threshold.
func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.advance(now)
lim.notifyThreshold = threshold
}
Expand All @@ -215,19 +217,25 @@ func (lim *Limiter) notify() {
// maybeNotify checks if it's time to send the notification and if so, performs
// the notification.
func (lim *Limiter) maybeNotify() {
if lim.IsLowTokens() {
if lim.isLowTokensLocked() {
lim.notify()
}
}

// IsLowTokens returns whether the limiter is in low tokens
func (lim *Limiter) IsLowTokens() bool {
func (lim *Limiter) isLowTokensLocked() bool {
if lim.isLowProcess || (lim.notifyThreshold > 0 && lim.tokens < lim.notifyThreshold) {
return true
}
return false
}

// IsLowTokens returns whether the limiter is in low tokens
func (lim *Limiter) IsLowTokens() bool {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.isLowTokensLocked()
}

// RemoveTokens decreases the amount of tokens currently available.
func (lim *Limiter) RemoveTokens(now time.Time, amount float64) {
lim.mu.Lock()
Expand Down Expand Up @@ -373,7 +381,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return fmt.Errorf("[resource group controller] limiter has no enough token")
return fmt.Errorf("[resource group controller] limiter has no enough token or needs wait too long")
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/resource_manager/client/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestCancel(t *testing.T) {
r1.CancelAt(t1)
checkTokens(re, lim1, t1, 11)

r1 = lim1.Reserve(ctx, t1, 5)
r2 := lim2.Reserve(ctx1, t1, 5)
r1 = lim1.Reserve(ctx, InfDuration, t1, 5)
r2 := lim2.Reserve(ctx1, InfDuration, t1, 5)
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
Expand All @@ -151,8 +151,8 @@ func TestCancel(t *testing.T) {
cancel1()

ctx2, cancel2 := context.WithCancel(ctx)
r1 = lim1.Reserve(ctx, t3, 5)
r2 = lim2.Reserve(ctx2, t3, 5)
r1 = lim1.Reserve(ctx, InfDuration, t3, 5)
r2 = lim2.Reserve(ctx2, InfDuration, t3, 5)
checkTokens(re, lim1, t3, 8)
checkTokens(re, lim2, t3, -2)
var wg sync.WaitGroup
Expand Down
56 changes: 50 additions & 6 deletions pkg/mcs/resource_manager/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type KVCalculator struct {

var _ ResourceCalculator = (*KVCalculator)(nil)

// func newKVCalculator(cfg *Config) *KVCalculator {
// return &KVCalculator{Config: cfg}
// }
func newKVCalculator(cfg *Config) *KVCalculator {
return &KVCalculator{Config: cfg}
}

// Trickle ...
func (kc *KVCalculator) Trickle(ctx context.Context, consumption *rmpb.Consumption) {
Expand Down Expand Up @@ -104,9 +104,9 @@ type SQLCalculator struct {

var _ ResourceCalculator = (*SQLCalculator)(nil)

// func newSQLCalculator(cfg *Config) *SQLCalculator {
// return &SQLCalculator{Config: cfg}
// }
func newSQLCalculator(cfg *Config) *SQLCalculator {
return &SQLCalculator{Config: cfg}
}

// Trickle ...
// TODO: calculate the SQL CPU cost and related resource consumption.
Expand All @@ -120,3 +120,47 @@ func (dsc *SQLCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Req
// AfterKVRequest ...
func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) {
}

func getRUValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RequestUnitType) float64 {
switch typ {
case 0:
return custom.RRU
case 1:
return custom.WRU
}
return 0
}

func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawResourceType) float64 {
switch typ {
case 0:
return custom.TotalCpuTimeMs
case 1:
return custom.ReadBytes
case 2:
return custom.WriteBytes
}
return 0
}

func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RRU += custom2.RRU
custom1.WRU += custom2.WRU
custom1.ReadBytes += custom2.ReadBytes
custom1.WriteBytes += custom2.WriteBytes
custom1.TotalCpuTimeMs += custom2.TotalCpuTimeMs
custom1.SqlLayerCpuTimeMs += custom2.SqlLayerCpuTimeMs
custom1.KvReadRpcCount += custom2.KvReadRpcCount
custom1.KvWriteRpcCount += custom2.KvWriteRpcCount
}

func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RRU -= custom2.RRU
custom1.WRU -= custom2.WRU
custom1.ReadBytes -= custom2.ReadBytes
custom1.WriteBytes -= custom2.WriteBytes
custom1.TotalCpuTimeMs -= custom2.TotalCpuTimeMs
custom1.SqlLayerCpuTimeMs -= custom2.SqlLayerCpuTimeMs
custom1.KvReadRpcCount -= custom2.KvReadRpcCount
custom1.KvWriteRpcCount -= custom2.KvWriteRpcCount
}
10 changes: 5 additions & 5 deletions pkg/mcs/resource_manager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func TestGroupTokenBucketRequest(t *testing.T) {

gtb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7)
tb, trickle := gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-190000), 1e-7)
re.Equal(trickle, int64(0))
// need to lend token
tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 11000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-11000), 1e-7)
re.Equal(trickle, int64(time.Second)*11000./4000./int64(time.Millisecond))
tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
Expand Down
14 changes: 13 additions & 1 deletion pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"math"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -117,10 +118,12 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe

// Firstly allocate the remaining tokens
var grantedTokens float64
hasRemaining := false
if t.Tokens > 0 {
grantedTokens = t.Tokens
neededTokens -= grantedTokens
t.Tokens = 0
hasRemaining = true
}

var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond
Expand Down Expand Up @@ -155,6 +158,7 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
if roundReserveTokens > neededTokens {
t.Tokens -= neededTokens
grantedTokens += neededTokens
trickleTime += grantedTokens / fillRate
neededTokens = 0
} else {
roundReserveTime := roundReserveTokens / fillRate
Expand All @@ -177,5 +181,13 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
grantedTokens = defaultReserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds()
}
res.Tokens = grantedTokens
return &res, targetPeriodTime.Milliseconds()
var trickleDuration time.Duration
// can't directly treat targetPeriodTime as trickleTime when there is a token remaining.
// If treat, client consumption will be slowed down (actually cloud be increased).
if hasRemaining {
trickleDuration = time.Duration(math.Min(trickleTime, targetPeriodTime.Seconds()) * float64(time.Second))
} else {
trickleDuration = targetPeriodTime
}
return &res, trickleDuration.Milliseconds()
}
Loading