Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: implement token assignment in server #5809

Merged
merged 12 commits into from
Jan 10, 2023
51 changes: 50 additions & 1 deletion pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package server

import (
"context"
"io"
"net/http"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource

// AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets.
func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error {
return errors.New("Not implemented")
for {
select {
case <-s.ctx.Done():
return errors.New("server closed")
default:
}
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
resps := &rmpb.TokenBucketsResponse{}
for _, req := range request.Requests {
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
if rg == nil {
return errors.New("resource group not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A logical error should not exit the main loop. Should set a error message in the response instead.

}
now := time.Now()
resp := &rmpb.TokenBucketResponse{
ResourceGroupName: rg.Name,
}
switch rg.Mode {
case rmpb.GroupMode_RUMode:
for _, re := range req.GetRuItems().GetRequestRU() {
switch re.Type {
case rmpb.RequestUnitType_RRU:
rg.UpdateRRU(now)
tokens := rg.RequestRRU(float64(re.Value), targetPeriodMs)
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
case rmpb.RequestUnitType_WRU:
rg.UpdateWRU(now)
tokens := rg.RequestWRU(float64(re.Value), targetPeriodMs)
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
}
}
case rmpb.GroupMode_NativeMode:
return errors.New("not supports the resource type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}
log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
}
}
85 changes: 78 additions & 7 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,30 @@ const defaultRefillRate = 10000

const defaultInitialTokens = 10 * 10000

const defaultMaxTokens = 1e7

const defaultLoanMaxPeriod = 10 * time.Second

var loanReserveRatio float64 = 0.05

// GroupTokenBucket is a token bucket for a resource group.
// TODO: statistics Consumption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: statistics Consumption
// TODO: statistics consumption @JmPotato

type GroupTokenBucket struct {
*rmpb.TokenBucket `json:"token_bucket,omitempty"`
Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
LoanExpireTime *time.Time `json:"loan_time,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments about the loan.

LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"`
MaxTokens float64 `json:"max_tokens,omitempty"`
}

func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket {
return GroupTokenBucket{
TokenBucket: tokenBucket,
MaxTokens: defaultMaxTokens,
LoanMaxPeriod: defaultLoanMaxPeriod,
}
}

// patch patches the token bucket settings.
Expand All @@ -51,8 +69,8 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) {
t.TokenBucket = tb
}

// Update updates the token bucket.
func (t *GroupTokenBucket) Update(now time.Time) {
// update updates the token bucket.
func (t *GroupTokenBucket) update(now time.Time) {
if !t.Initialized {
t.Settings.Fillrate = defaultRefillRate
t.Tokens = defaultInitialTokens
Expand All @@ -66,12 +84,65 @@ func (t *GroupTokenBucket) Update(now time.Time) {
t.Tokens += float64(t.Settings.Fillrate) * delta.Seconds()
t.LastUpdate = &now
}
if t.Tokens >= 0 {
t.LoanExpireTime = nil
}
if t.Tokens > defaultMaxTokens {
t.Tokens = defaultMaxTokens
}

}

// Request requests tokens from the token bucket.
func (t *GroupTokenBucket) Request(
// request requests tokens from the token bucket.
func (t *GroupTokenBucket) request(
Copy link
Contributor

@nolouch nolouch Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think request and update can be one function, also reduce the code functions.

neededTokens float64, targetPeriodMs uint64,
) *rmpb.TokenBucket {
// TODO: Implement the token bucket algorithm.
return nil
) (*rmpb.TokenBucket, int64) {
var res rmpb.TokenBucket
res.Settings = &rmpb.TokenLimitSettings{}
// TODO: consider the shares for dispatch the fill rate
res.Settings.Fillrate = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's used for the token server unavailable in abnormal situation.


if neededTokens <= 0 {
return &res, 0
}

if t.Tokens >= neededTokens {
t.Tokens -= neededTokens
// granted the total request tokens
res.Tokens = neededTokens
return &res, 0
}

var grantedTokens float64
if t.Tokens > 0 {
grantedTokens = t.Tokens
t.Tokens = 0
neededTokens -= grantedTokens
}

now := time.Now()
var periodFilled float64
var trickleTime int64 = int64(targetPeriodMs)
if t.LoanExpireTime != nil && t.LoanExpireTime.After(now) {
duration := t.LoanExpireTime.Sub(now)
periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * duration.Seconds()
trickleTime = duration.Milliseconds()
} else {
now.Add(t.LoanMaxPeriod)
t.LoanExpireTime = &now
periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure LoadMaxPeriod period greater than targetPeriodMs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and why not periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * trickleTime

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

periodFilled is the number of tokens that can be allocated in whole loan period, so I think we should use LoanMaxPeriod

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will make the tokens not be graceful consumed with Fillrate in the loan period. Client will consume more tokens in target period(loan expire period> target period)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that you want the consumption curve to be smoother. To fit this situation, we can use shorter LoanMaxPeriod. We can use long LoanMaxPeriod for high credit or important resource group, so they can respond more promptly to qps peaks.

}
periodFilled += t.Tokens
if periodFilled <= float64(t.Settings.Fillrate)*loanReserveRatio {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need * trickleTime?

periodFilled = float64(t.Settings.Fillrate) * loanReserveRatio
}
if periodFilled >= neededTokens {
grantedTokens += neededTokens
t.Tokens -= neededTokens
} else {
grantedTokens += periodFilled
t.Tokens -= periodFilled
}
res.Tokens = grantedTokens
return &res, trickleTime
}
61 changes: 46 additions & 15 deletions pkg/mcs/resource_manager/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server
import (
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
Expand Down Expand Up @@ -146,34 +147,64 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
case rmpb.GroupMode_RUMode:
if settings := group.GetSettings().GetRUSettings(); settings != nil {
ruSettings = &RequestUnitSettings{
RRU: GroupTokenBucket{
TokenBucket: settings.GetRRU(),
},
WRU: GroupTokenBucket{
TokenBucket: settings.GetWRU(),
},
RRU: NewGroupTokenBucket(settings.GetRRU()),
WRU: NewGroupTokenBucket(settings.GetWRU()),
}
rg.RUSettings = ruSettings
}
case rmpb.GroupMode_NativeMode:
if settings := group.GetSettings().GetResourceSettings(); settings != nil {
resourceSettings = &NativeResourceSettings{
CPU: GroupTokenBucket{
TokenBucket: settings.GetCpu(),
},
IOReadBandwidth: GroupTokenBucket{
TokenBucket: settings.GetIoRead(),
},
IOWriteBandwidth: GroupTokenBucket{
TokenBucket: settings.GetIoWrite(),
},
CPU: NewGroupTokenBucket(settings.GetCpu()),
IOReadBandwidth: NewGroupTokenBucket(settings.GetIoRead()),
IOWriteBandwidth: NewGroupTokenBucket(settings.GetIoWrite()),
}
rg.ResourceSettings = resourceSettings
}
}
return rg
}

// UpdateRRU updates the RRU of the resource group.
func (rg *ResourceGroup) UpdateRRU(now time.Time) {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings != nil {
rg.RUSettings.RRU.update(now)
}
}

// UpdateWRU updates the WRU of the resource group.
func (rg *ResourceGroup) UpdateWRU(now time.Time) {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings != nil {
rg.RUSettings.WRU.update(now)
}
}

// RequestRRU requests the RRU of the resource group.
func (rg *ResourceGroup) RequestRRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RRU.request(neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// RequestWRU requests the WRU of the resource group.
func (rg *ResourceGroup) RequestWRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.WRU.request(neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.
func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
rg.RLock()
Expand Down