Skip to content

Commit

Permalink
resource_manager: implement token assignment in server (#5809)
Browse files Browse the repository at this point in the history
close #5822

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Jan 10, 2023
1 parent 1b778f2 commit 4363701
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 41 deletions.
51 changes: 50 additions & 1 deletion pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package server

import (
"context"
"io"
"net/http"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource

// AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets.
func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error {
return errors.New("Not implemented")
for {
select {
case <-s.ctx.Done():
return errors.New("server closed")
default:
}
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
resps := &rmpb.TokenBucketsResponse{}
for _, req := range request.Requests {
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
if rg == nil {
log.Warn("resource group not found", zap.String("resource-group", req.ResourceGroupName))
continue
}
now := time.Now()
resp := &rmpb.TokenBucketResponse{
ResourceGroupName: rg.Name,
}
switch rg.Mode {
case rmpb.GroupMode_RUMode:
var tokens *rmpb.GrantedRUTokenBucket
for _, re := range req.GetRuItems().GetRequestRU() {
switch re.Type {
case rmpb.RequestUnitType_RRU:
tokens = rg.RequestRRU(now, re.Value, targetPeriodMs)
case rmpb.RequestUnitType_WRU:
tokens = rg.RequestWRU(now, re.Value, targetPeriodMs)
}
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
}
case rmpb.GroupMode_RawMode:
log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)]))
continue
}
log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
}
}
43 changes: 28 additions & 15 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"path"
"sync"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
Expand Down Expand Up @@ -153,34 +154,46 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
case rmpb.GroupMode_RUMode:
if settings := group.GetRUSettings(); settings != nil {
ruSettings = &RequestUnitSettings{
RRU: GroupTokenBucket{
TokenBucket: settings.GetRRU(),
},
WRU: GroupTokenBucket{
TokenBucket: settings.GetWRU(),
},
RRU: NewGroupTokenBucket(settings.GetRRU()),
WRU: NewGroupTokenBucket(settings.GetWRU()),
}
rg.RUSettings = ruSettings
}
case rmpb.GroupMode_RawMode:
if settings := group.GetResourceSettings(); settings != nil {
resourceSettings = &NativeResourceSettings{
CPU: GroupTokenBucket{
TokenBucket: settings.GetCpu(),
},
IOReadBandwidth: GroupTokenBucket{
TokenBucket: settings.GetIoRead(),
},
IOWriteBandwidth: GroupTokenBucket{
TokenBucket: settings.GetIoWrite(),
},
CPU: NewGroupTokenBucket(settings.GetCpu()),
IOReadBandwidth: NewGroupTokenBucket(settings.GetIoRead()),
IOWriteBandwidth: NewGroupTokenBucket(settings.GetIoWrite()),
}
rg.ResourceSettings = resourceSettings
}
}
return rg
}

// RequestRRU requests the RRU of the resource group.
func (rg *ResourceGroup) RequestRRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RRU.request(now, neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// RequestWRU requests the WRU of the resource group.
func (rg *ResourceGroup) RequestWRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.WRU.request(now, neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.
func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
rg.RLock()
Expand Down
92 changes: 92 additions & 0 deletions pkg/mcs/resource_manager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,g
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"math"
"testing"
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
re := require.New(t)
tbSetting := &rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 20000000,
},
}

tb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
tb.request(time1, 0, 0)
re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7)
re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate)

tbSetting = &rmpb.TokenBucket{
Tokens: -100000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 1000,
BurstLimit: 10000000,
},
}
tb.patch(tbSetting)

time2 := time.Now()
tb.request(time2, 0, 0)
re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7)
re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate)
}

func TestGroupTokenBucketRequest(t *testing.T) {
re := require.New(t)
tbSetting := &rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 20000000,
},
}

gtb := NewGroupTokenBucket(tbSetting)
time1 := time.Now()
tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7)
re.Equal(trickle, int64(0))
// need to lend token
tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 60000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-22000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-2000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-1000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
time2 := time1.Add(10 * time.Second)
tb, trickle = gtb.request(time2, 20000, uint64(time.Second)*10/uint64(time.Millisecond))
re.LessOrEqual(math.Abs(tb.Tokens-19000), 1e-7)
re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond))
}
Loading

0 comments on commit 4363701

Please sign in to comment.