-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resource_manager: implement token assignment in server #5809
Changes from 3 commits
ca83b1d
321c2ad
0fc2ca5
ef25725
6a13cab
2d2fc55
887d221
d5aa646
5fb09c9
fc04c44
3896b95
3865ab3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,16 @@ package server | |
|
||
import ( | ||
"context" | ||
"io" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||
"github.com/pingcap/log" | ||
"github.com/tikv/pd/pkg/mcs/registry" | ||
"github.com/tikv/pd/server" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
|
@@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource | |
|
||
// AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets. | ||
func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error { | ||
return errors.New("Not implemented") | ||
for { | ||
select { | ||
case <-s.ctx.Done(): | ||
return errors.New("server closed") | ||
default: | ||
} | ||
request, err := stream.Recv() | ||
if err == io.EOF { | ||
return nil | ||
} | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
targetPeriodMs := request.GetTargetRequestPeriodMs() | ||
resps := &rmpb.TokenBucketsResponse{} | ||
for _, req := range request.Requests { | ||
rg := s.manager.GetResourceGroup(req.ResourceGroupName) | ||
if rg == nil { | ||
return errors.New("resource group not found") | ||
} | ||
now := time.Now() | ||
resp := &rmpb.TokenBucketResponse{ | ||
ResourceGroupName: rg.Name, | ||
} | ||
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: | ||
for _, re := range req.GetRuItems().GetRequestRU() { | ||
switch re.Type { | ||
case rmpb.RequestUnitType_RRU: | ||
rg.UpdateRRU(now) | ||
tokens := rg.RequestRRU(re.Value, targetPeriodMs) | ||
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) | ||
case rmpb.RequestUnitType_WRU: | ||
rg.UpdateWRU(now) | ||
tokens := rg.RequestWRU(re.Value, targetPeriodMs) | ||
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) | ||
} | ||
} | ||
case rmpb.GroupMode_NativeMode: | ||
return errors.New("not supports the resource type") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
} | ||
log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName)) | ||
resps.Responses = append(resps.Responses, resp) | ||
} | ||
stream.Send(resps) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// Copyright 2022 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS,g | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package server | ||
|
||
import ( | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { | ||
re := require.New(t) | ||
tbSetting := &rmpb.TokenBucket{ | ||
Tokens: 200000, | ||
Settings: &rmpb.TokenLimitSettings{ | ||
Fillrate: 2000, | ||
BurstLimit: 20000000, | ||
}, | ||
} | ||
|
||
tb := NewGroupTokenBucket(tbSetting) | ||
time1 := time.Now() | ||
tb.update(time1) | ||
re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7) | ||
re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) | ||
|
||
tbSetting = &rmpb.TokenBucket{ | ||
Tokens: -100000, | ||
Settings: &rmpb.TokenLimitSettings{ | ||
Fillrate: 1000, | ||
BurstLimit: 10000000, | ||
}, | ||
} | ||
tb.patch(tbSetting) | ||
|
||
time2 := time.Now() | ||
tb.update(time2) | ||
re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.Fillrate)+1e7) | ||
re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) | ||
} | ||
|
||
func TestGroupTokenBucketRequest(t *testing.T) { | ||
re := require.New(t) | ||
tbSetting := &rmpb.TokenBucket{ | ||
Tokens: 200000, | ||
Settings: &rmpb.TokenLimitSettings{ | ||
Fillrate: 2000, | ||
BurstLimit: 20000000, | ||
}, | ||
} | ||
|
||
gtb := NewGroupTokenBucket(tbSetting) | ||
time1 := time.Now() | ||
gtb.update(time1) | ||
tb, trickle := gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) | ||
re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) | ||
re.Equal(trickle, int64(0)) | ||
tb, trickle = gtb.request(101000, uint64(time.Second)*10/uint64(time.Millisecond)) | ||
re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) | ||
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) | ||
re.Equal(*gtb.LoanExpireTime, time1.Add(gtb.LoanMaxPeriod)) | ||
time2 := time.Now() | ||
gtb.update(time2) | ||
tb, trickle = gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) | ||
re.LessOrEqual(math.Abs(tb.Tokens-19000*(1-loanReserveRatio)), time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*(1-loanReserveRatio)*float64(tb.Settings.Fillrate)+1e7) | ||
re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) | ||
tb, trickle = gtb.request(2000, uint64(time.Second)*10/uint64(time.Millisecond)) | ||
re.LessOrEqual(tb.Tokens, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*loanReserveRatio*float64(tb.Settings.Fillrate)+1e7) | ||
re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,16 +21,32 @@ import ( | |||||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||||||
) | ||||||
|
||||||
const defaultRefillRate = 10000 | ||||||
|
||||||
const defaultInitialTokens = 10 * 10000 | ||||||
|
||||||
const defaultMaxTokens = 1e7 | ||||||
|
||||||
const defaultLoanMaxPeriod = 10 * time.Second | ||||||
|
||||||
var loanReserveRatio float64 = 0.05 | ||||||
|
||||||
// GroupTokenBucket is a token bucket for a resource group. | ||||||
// TODO: statistics Consumption | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
type GroupTokenBucket struct { | ||||||
*rmpb.TokenBucket `json:"token_bucket,omitempty"` | ||||||
Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` | ||||||
LastUpdate *time.Time `json:"last_update,omitempty"` | ||||||
Initialized bool `json:"initialized"` | ||||||
LoanExpireTime *time.Time `json:"loan_time,omitempty"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comments about the loan. |
||||||
LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"` | ||||||
MaxTokens float64 `json:"max_tokens,omitempty"` | ||||||
} | ||||||
|
||||||
func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket { | ||||||
return GroupTokenBucket{ | ||||||
TokenBucket: tokenBucket, | ||||||
MaxTokens: defaultMaxTokens, | ||||||
LoanMaxPeriod: defaultLoanMaxPeriod, | ||||||
} | ||||||
} | ||||||
|
||||||
// patch patches the token bucket settings. | ||||||
|
@@ -51,11 +67,12 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { | |||||
t.TokenBucket = tb | ||||||
} | ||||||
|
||||||
// Update updates the token bucket. | ||||||
func (t *GroupTokenBucket) Update(now time.Time) { | ||||||
// update updates the token bucket. | ||||||
func (t *GroupTokenBucket) update(now time.Time) { | ||||||
if !t.Initialized { | ||||||
t.Settings.Fillrate = defaultRefillRate | ||||||
t.Tokens = defaultInitialTokens | ||||||
if t.Tokens < defaultInitialTokens { | ||||||
t.Tokens = defaultInitialTokens | ||||||
} | ||||||
t.LastUpdate = &now | ||||||
t.Initialized = true | ||||||
return | ||||||
|
@@ -66,12 +83,62 @@ func (t *GroupTokenBucket) Update(now time.Time) { | |||||
t.Tokens += float64(t.Settings.Fillrate) * delta.Seconds() | ||||||
t.LastUpdate = &now | ||||||
} | ||||||
if t.Tokens >= 0 { | ||||||
t.LoanExpireTime = nil | ||||||
} | ||||||
if t.Tokens > defaultMaxTokens { | ||||||
t.Tokens = defaultMaxTokens | ||||||
} | ||||||
} | ||||||
|
||||||
// Request requests tokens from the token bucket. | ||||||
func (t *GroupTokenBucket) Request( | ||||||
// request requests tokens from the token bucket. | ||||||
func (t *GroupTokenBucket) request( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||||||
neededTokens float64, targetPeriodMs uint64, | ||||||
) *rmpb.TokenBucket { | ||||||
// TODO: Implement the token bucket algorithm. | ||||||
return nil | ||||||
) (*rmpb.TokenBucket, int64) { | ||||||
var res rmpb.TokenBucket | ||||||
res.Settings = &rmpb.TokenLimitSettings{} | ||||||
// TODO: consider the shares for dispatch the fill rate | ||||||
res.Settings.Fillrate = 0 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's used for the token server unavailable in abnormal situation. |
||||||
if neededTokens <= 0 { | ||||||
return &res, 0 | ||||||
} | ||||||
|
||||||
if t.Tokens >= neededTokens { | ||||||
t.Tokens -= neededTokens | ||||||
// granted the total request tokens | ||||||
res.Tokens = neededTokens | ||||||
return &res, 0 | ||||||
} | ||||||
|
||||||
var grantedTokens float64 | ||||||
if t.Tokens > 0 { | ||||||
grantedTokens = t.Tokens | ||||||
t.Tokens = 0 | ||||||
neededTokens -= grantedTokens | ||||||
} | ||||||
|
||||||
var periodFilled float64 | ||||||
var trickleTime = int64(targetPeriodMs) | ||||||
if t.LoanExpireTime != nil && t.LoanExpireTime.After(*t.LastUpdate) { | ||||||
duration := t.LoanExpireTime.Sub(*t.LastUpdate) | ||||||
periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * duration.Seconds() | ||||||
trickleTime = duration.Milliseconds() | ||||||
} else { | ||||||
et := t.LastUpdate.Add(t.LoanMaxPeriod) | ||||||
t.LoanExpireTime = &et | ||||||
periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to ensure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. periodFilled is the number of tokens that can be allocated in whole loan period, so I think we should use LoanMaxPeriod There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will make the tokens not be graceful consumed with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that you want the consumption curve to be smoother. To fit this situation, we can use shorter |
||||||
} | ||||||
periodFilled += t.Tokens | ||||||
if periodFilled <= float64(t.Settings.Fillrate)*loanReserveRatio { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need |
||||||
periodFilled = float64(t.Settings.Fillrate) * loanReserveRatio | ||||||
} | ||||||
if periodFilled >= neededTokens { | ||||||
grantedTokens += neededTokens | ||||||
t.Tokens -= neededTokens | ||||||
} else { | ||||||
grantedTokens += periodFilled | ||||||
t.Tokens -= periodFilled | ||||||
} | ||||||
res.Tokens = grantedTokens | ||||||
return &res, trickleTime | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A logical error should not exit the main loop. Should set a error message in the response instead.