diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index a0f9c1c5c38..a2c827de334 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -16,12 +16,16 @@ package server import ( "context" + "io" "net/http" + "time" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/server" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource // AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets. func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error { - return errors.New("Not implemented") + for { + select { + case <-s.ctx.Done(): + return errors.New("server closed") + default: + } + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + targetPeriodMs := request.GetTargetRequestPeriodMs() + resps := &rmpb.TokenBucketsResponse{} + for _, req := range request.Requests { + rg := s.manager.GetResourceGroup(req.ResourceGroupName) + if rg == nil { + log.Warn("resource group not found", zap.String("resource-group", req.ResourceGroupName)) + continue + } + now := time.Now() + resp := &rmpb.TokenBucketResponse{ + ResourceGroupName: rg.Name, + } + switch rg.Mode { + case rmpb.GroupMode_RUMode: + var tokens *rmpb.GrantedRUTokenBucket + for _, re := range req.GetRuItems().GetRequestRU() { + switch re.Type { + case rmpb.RequestUnitType_RRU: + tokens = rg.RequestRRU(now, re.Value, targetPeriodMs) + case rmpb.RequestUnitType_WRU: + tokens = rg.RequestWRU(now, re.Value, targetPeriodMs) + } + resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) + } + case rmpb.GroupMode_RawMode: + log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)])) + continue + } + log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName)) + resps.Responses = append(resps.Responses, resp) + } + stream.Send(resps) + } } diff --git a/pkg/mcs/resource_manager/server/resource_group.go b/pkg/mcs/resource_manager/server/resource_group.go index 3058724a7fe..4a711f3596a 100644 --- a/pkg/mcs/resource_manager/server/resource_group.go +++ b/pkg/mcs/resource_manager/server/resource_group.go @@ -19,6 +19,7 @@ import ( "encoding/json" "path" "sync" + "time" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" @@ -153,27 +154,17 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { case rmpb.GroupMode_RUMode: if settings := group.GetRUSettings(); settings != nil { ruSettings = &RequestUnitSettings{ - RRU: GroupTokenBucket{ - TokenBucket: settings.GetRRU(), - }, - WRU: GroupTokenBucket{ - TokenBucket: settings.GetWRU(), - }, + RRU: NewGroupTokenBucket(settings.GetRRU()), + WRU: NewGroupTokenBucket(settings.GetWRU()), } rg.RUSettings = ruSettings } case rmpb.GroupMode_RawMode: if settings := group.GetResourceSettings(); settings != nil { resourceSettings = &NativeResourceSettings{ - CPU: GroupTokenBucket{ - TokenBucket: settings.GetCpu(), - }, - IOReadBandwidth: GroupTokenBucket{ - TokenBucket: settings.GetIoRead(), - }, - IOWriteBandwidth: GroupTokenBucket{ - TokenBucket: settings.GetIoWrite(), - }, + CPU: NewGroupTokenBucket(settings.GetCpu()), + IOReadBandwidth: NewGroupTokenBucket(settings.GetIoRead()), + IOWriteBandwidth: NewGroupTokenBucket(settings.GetIoWrite()), } rg.ResourceSettings = resourceSettings } @@ -181,6 +172,28 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { return rg } +// RequestRRU requests the RRU of the resource group. +func (rg *ResourceGroup) RequestRRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings == nil { + return nil + } + tb, trickleTimeMs := rg.RUSettings.RRU.request(now, neededTokens, targetPeriodMs) + return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} +} + +// RequestWRU requests the WRU of the resource group. +func (rg *ResourceGroup) RequestWRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings == nil { + return nil + } + tb, trickleTimeMs := rg.RUSettings.WRU.request(now, neededTokens, targetPeriodMs) + return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} +} + // IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup. func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup { rg.RLock() diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go new file mode 100644 index 00000000000..a7ecbe81d77 --- /dev/null +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -0,0 +1,92 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "math" + "testing" + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { + re := require.New(t) + tbSetting := &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: 20000000, + }, + } + + tb := NewGroupTokenBucket(tbSetting) + time1 := time.Now() + tb.request(time1, 0, 0) + re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7) + re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) + + tbSetting = &rmpb.TokenBucket{ + Tokens: -100000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 1000, + BurstLimit: 10000000, + }, + } + tb.patch(tbSetting) + + time2 := time.Now() + tb.request(time2, 0, 0) + re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7) + re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) +} + +func TestGroupTokenBucketRequest(t *testing.T) { + re := require.New(t) + tbSetting := &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: 20000000, + }, + } + + gtb := NewGroupTokenBucket(tbSetting) + time1 := time.Now() + tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) + re.Equal(trickle, int64(0)) + // need to lend token + tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 60000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-22000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-2000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-1000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + time2 := time1.Add(10 * time.Second) + tb, trickle = gtb.request(time2, 20000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-19000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) +} diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index d3efcfd9f1b..4f6d294a82e 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -21,16 +21,35 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" ) -const defaultRefillRate = 10000 +const ( + defaultRefillRate = 10000 + defaultInitialTokens = 10 * 10000 + defaultMaxTokens = 1e7 +) -const defaultInitialTokens = 10 * 10000 +const ( + defaultReserveRatio = 0.05 + defaultLoanCoefficient = 2 +) // GroupTokenBucket is a token bucket for a resource group. +// TODO: statistics consumption @JmPotato type GroupTokenBucket struct { *rmpb.TokenBucket `json:"token_bucket,omitempty"` - Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` - LastUpdate *time.Time `json:"last_update,omitempty"` - Initialized bool `json:"initialized"` + // MaxTokens limits the number of tokens that can be accumulated + MaxTokens float64 `json:"max_tokens,omitempty"` + + Consumption *rmpb.Consumption `json:"consumption,omitempty"` + LastUpdate *time.Time `json:"last_update,omitempty"` + Initialized bool `json:"initialized"` +} + +// NewGroupTokenBucket returns a new GroupTokenBucket +func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket { + return GroupTokenBucket{ + TokenBucket: tokenBucket, + MaxTokens: defaultMaxTokens, + } } // patch patches the token bucket settings. @@ -51,31 +70,113 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { t.TokenBucket = tb } -// Update updates the token bucket. -func (t *GroupTokenBucket) Update(now time.Time) { +// init initializes the group token bucket. +func (t *GroupTokenBucket) init(now time.Time) { + if t.Settings.FillRate == 0 { + t.Settings.FillRate = defaultRefillRate + } + if t.Tokens < defaultInitialTokens { + t.Tokens = defaultInitialTokens + } + // TODO: If we support init or modify MaxTokens in the future, we can move following code. + if t.Tokens > t.MaxTokens { + t.MaxTokens = t.Tokens + } + t.LastUpdate = &now + t.Initialized = true +} + +// request requests tokens from the group token bucket. +func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) { if !t.Initialized { - if t.Settings.FillRate == 0 { - t.Settings.FillRate = defaultRefillRate + t.init(now) + } else { + delta := now.Sub(*t.LastUpdate) + if delta > 0 { + t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() + t.LastUpdate = &now } - if t.Tokens < defaultInitialTokens { - t.Tokens = defaultInitialTokens + if t.Tokens > t.MaxTokens { + t.Tokens = t.MaxTokens } - t.LastUpdate = &now - t.Initialized = true - return } - delta := now.Sub(*t.LastUpdate) - if delta > 0 { - t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() - t.LastUpdate = &now + var res rmpb.TokenBucket + res.Settings = &rmpb.TokenLimitSettings{} + // FillRate is used for the token server unavailable in abnormal situation. + if neededTokens <= 0 { + return &res, 0 } -} -// Request requests tokens from the token bucket. -func (t *GroupTokenBucket) Request( - neededTokens float64, targetPeriodMs uint64, -) *rmpb.TokenBucket { - // TODO: Implement the token bucket algorithm. - return nil + // If the current tokens can directly meet the requirement, returns the need token + if t.Tokens >= neededTokens { + t.Tokens -= neededTokens + // granted the total request tokens + res.Tokens = neededTokens + return &res, 0 + } + + // Firstly allocate the remaining tokens + var grantedTokens float64 + if t.Tokens > 0 { + grantedTokens = t.Tokens + neededTokens -= grantedTokens + t.Tokens = 0 + } + + var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond + var trickleTime = 0. + + // When there are loan, the allotment will match the fill rate. + // We will have k threshold, beyond which the token allocation will be a minimum. + // The threshold unit is `fill rate * target period`. + // | + // k*fill_rate |* * * * * * * + // | * + // *** | * + // | * + // | * + // fill_rate | * + // reserve_rate | * + // | + // grant_rate 0 ------------------------------------------------------------------------------------ + // loan *** k*period_token (k+k-1)*period_token *** (k+k+1...+1)*period_token + p := make([]float64, defaultLoanCoefficient) + p[0] = float64(defaultLoanCoefficient) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() + for i := 1; i < defaultLoanCoefficient; i++ { + p[i] = float64(defaultLoanCoefficient-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1] + } + for i := 0; i < defaultLoanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ { + loan := -t.Tokens + if loan > p[i] { + continue + } + roundReserveTokens := p[i] - loan + fillRate := float64(defaultLoanCoefficient-i) * float64(t.Settings.FillRate) + if roundReserveTokens > neededTokens { + t.Tokens -= neededTokens + grantedTokens += neededTokens + neededTokens = 0 + } else { + roundReserveTime := roundReserveTokens / fillRate + if roundReserveTime+trickleTime >= targetPeriodTime.Seconds() { + roundTokens := (targetPeriodTime.Seconds() - trickleTime) * fillRate + neededTokens -= roundTokens + t.Tokens -= roundTokens + grantedTokens += roundTokens + trickleTime = targetPeriodTime.Seconds() + } else { + grantedTokens += roundReserveTokens + neededTokens -= roundReserveTokens + t.Tokens -= roundReserveTokens + trickleTime += roundReserveTime + } + } + } + if grantedTokens < defaultReserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() { + t.Tokens -= defaultReserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() - grantedTokens + grantedTokens = defaultReserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() + } + res.Tokens = grantedTokens + return &res, targetPeriodTime.Milliseconds() }