Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: implement the token-consuming by client #6039

Merged
merged 43 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c3ed91
roughly impl
HuSharp Feb 23, 2023
b73f7db
make check happy
HuSharp Feb 23, 2023
0c6ceb2
make check happy
HuSharp Feb 23, 2023
0b56890
move sync
HuSharp Feb 24, 2023
384406e
add client id
HuSharp Feb 27, 2023
29fd6a6
Merge branch 'master' into token_client_limit
HuSharp Feb 28, 2023
3ec7cbf
update config
HuSharp Feb 28, 2023
1cebac6
remove stale slot
HuSharp Mar 1, 2023
e72b96e
merge master
HuSharp Mar 2, 2023
a2c1568
address comment
HuSharp Mar 8, 2023
5a88b30
Merge branch 'master' into token_client_limit
HuSharp Mar 8, 2023
ea1db71
merge upstream
HuSharp Mar 10, 2023
922c6a7
make ci happy
HuSharp Mar 10, 2023
6ad83df
Merge branch 'master' into token_client_limit
HuSharp Mar 13, 2023
15a7ad0
update cal
HuSharp Mar 13, 2023
23777f6
Merge branch 'master' into token_client_limit
HuSharp Mar 13, 2023
7466d46
update lock
HuSharp Mar 13, 2023
8b8ecdf
Merge branch 'master' into token_client_limit
HuSharp Mar 14, 2023
dec9006
address comment and update impl
HuSharp Mar 15, 2023
70481b1
update comment
HuSharp Mar 16, 2023
1dca47d
needTokens amplification
HuSharp Mar 16, 2023
8cd6520
merge upstream
HuSharp Mar 16, 2023
7b83512
make check happy
HuSharp Mar 16, 2023
3557221
merge upstream
HuSharp Mar 16, 2023
2664000
Merge branch 'master' into token_client_limit
HuSharp Mar 17, 2023
03be994
revert buffer
HuSharp Mar 17, 2023
8918d1b
add comment
HuSharp Mar 20, 2023
0992131
merge upstream
HuSharp Mar 20, 2023
c92ab35
merge upstream
HuSharp Mar 20, 2023
ed6e033
merge upstream
HuSharp Mar 21, 2023
071fd44
reserve last burst
HuSharp Mar 21, 2023
da7adf3
reserve last burst
HuSharp Mar 21, 2023
19dd323
remove cleanup
HuSharp Mar 21, 2023
2699523
address comment
HuSharp Mar 21, 2023
654702d
Merge branch 'master' into token_client_limit
HuSharp Mar 21, 2023
59dbdd0
address comment
HuSharp Mar 22, 2023
86024e5
update ratio to 0.5
HuSharp Mar 22, 2023
e3c4045
update test with ratio
HuSharp Mar 22, 2023
5d73b28
update test with ratio
HuSharp Mar 22, 2023
20d084c
Merge branch 'master' into token_client_limit
ti-chi-bot Mar 23, 2023
a30f6af
make check happy
HuSharp Mar 23, 2023
b31fffc
remove error
HuSharp Mar 23, 2023
ceec274
remove error
HuSharp Mar 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
}()
}

// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs wait some time.
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, error) {
Expand Down Expand Up @@ -837,13 +837,13 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
counter.inDegradedMode = false
var cfg tokenBucketReconfigureArgs
cfg.NewBurst = bucket.GetSettings().GetBurstLimit()
// when trickleTimeMs equals zero, server has enough tokens and does not need to
// When trickleTimeMs equals zero, server has enough tokens and does not need to
// limit client consume token. So all token is granted to client right now.
if trickleTimeMs == 0 {
cfg.NewTokens = granted
cfg.NewRate = float64(bucket.GetSettings().FillRate)
counter.lastDeadline = time.Time{}
cfg.NotifyThreshold = math.Min((granted+counter.limiter.AvailableTokens(gc.run.now)), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction
cfg.NotifyThreshold = math.Min(granted+counter.limiter.AvailableTokens(gc.run.now), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction
// In the non-trickle case, clients can be allowed to accumulate more tokens.
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func Every(interval time.Duration) Limit {
//
// Some changes about burst(b):
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within a unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b > 0, that means the limiter is limited capacity.
type Limiter struct {
mu sync.Mutex
Expand Down
3 changes: 1 addition & 2 deletions pkg/mcs/resource_manager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func (s *Service) postResourceGroup(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
nGroup := rmserver.FromProtoResourceGroup(&group)
if err := s.manager.AddResourceGroup(nGroup); err != nil {
if err := s.manager.AddResourceGroup(&group); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGro
if err := s.checkServing(); err != nil {
return nil, err
}
rg := FromProtoResourceGroup(req.GetGroup())
err := s.manager.AddResourceGroup(rg)
err := s.manager.AddResourceGroup(req.GetGroup())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,6 +179,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
return err
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
clientUniqueID := request.GetClientUniqueId()
resps := &rmpb.TokenBucketsResponse{}
for _, req := range request.Requests {
resourceGroupName := req.GetResourceGroupName()
Expand All @@ -203,7 +203,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
var tokens *rmpb.GrantedRUTokenBucket
for _, re := range req.GetRuItems().GetRequestRU() {
if re.Type == rmpb.RequestUnitType_RU {
tokens = rg.RequestRU(now, re.Value, targetPeriodMs)
tokens = rg.RequestRU(now, re.Value, targetPeriodMs, clientUniqueID)
}
if tokens == nil {
continue
Expand Down
19 changes: 13 additions & 6 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,22 @@ func (m *Manager) Init(ctx context.Context) {
}

// AddResourceGroup puts a resource group.
func (m *Manager) AddResourceGroup(group *ResourceGroup) error {
func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error {
// Check the name.
if len(grouppb.Name) == 0 || len(grouppb.Name) > 32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the name valid at the top.

return errors.New("invalid resource group name, the length should be in [1,32]")
}
// Check the Priority.
if grouppb.GetPriority() > 16 {
return errors.New("invalid resource group priority, the value should be in [0,16]")
}
m.RLock()
_, ok := m.groups[group.Name]
_, ok := m.groups[grouppb.Name]
m.RUnlock()
if ok {
return errors.New("this group already exists")
}
err := group.CheckAndInit()
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
group := FromProtoResourceGroup(grouppb)
m.Lock()
defer m.Unlock()
if err := group.persistSettings(m.storage); err != nil {
Expand Down Expand Up @@ -247,7 +252,9 @@ func (m *Manager) persistResourceGroupRunningState() {
group, ok := m.groups[keys[idx]]
m.RUnlock()
if ok {
m.Lock()
group.persistStates(m.storage)
m.Unlock()
}
}
}
Expand Down
119 changes: 23 additions & 96 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ type ResourceGroup struct {
Mode rmpb.GroupMode `json:"mode"`
// RU settings
RUSettings *RequestUnitSettings `json:"r_u_settings,omitempty"`
// raw resource settings
RawResourceSettings *RawResourceSettings `json:"raw_resource_settings,omitempty"`
Priority uint32 `json:"priority"`
Priority uint32 `json:"priority"`
}

// RequestUnitSettings is the definition of the RU settings.
type RequestUnitSettings struct {
RU GroupTokenBucket `json:"r_u,omitempty"`
RU *GroupTokenBucket `json:"r_u,omitempty"`
}

// NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.
Expand All @@ -51,22 +49,6 @@ func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings
}
}

// RawResourceSettings is the definition of the native resource settings.
type RawResourceSettings struct {
CPU GroupTokenBucket `json:"cpu,omitempty"`
IOReadBandwidth GroupTokenBucket `json:"io_read_bandwidth,omitempty"`
IOWriteBandwidth GroupTokenBucket `json:"io_write_bandwidth,omitempty"`
}

// NewRawResourceSettings creates a new RawResourceSettings with the given token buckets.
func NewRawResourceSettings(cpu, ioRead, ioWrite *rmpb.TokenBucket) *RawResourceSettings {
return &RawResourceSettings{
CPU: NewGroupTokenBucket(cpu),
IOReadBandwidth: NewGroupTokenBucket(ioRead),
IOWriteBandwidth: NewGroupTokenBucket(ioWrite),
}
}

func (rg *ResourceGroup) String() string {
res, err := json.Marshal(rg)
if err != nil {
Expand All @@ -93,42 +75,13 @@ func (rg *ResourceGroup) Copy() *ResourceGroup {
return &newRG
}

// CheckAndInit checks the validity of the resource group and initializes the default values if not setting.
// Only used to initialize the resource group when creating.
func (rg *ResourceGroup) CheckAndInit() error {
if len(rg.Name) == 0 || len(rg.Name) > 32 {
return errors.New("invalid resource group name, the length should be in [1,32]")
}
if rg.Priority > 16 {
return errors.New("invalid resource group priority, the value should be in [0,16]")
}
switch rg.Mode {
case rmpb.GroupMode_RUMode:
if rg.RUSettings == nil {
rg.RUSettings = NewRequestUnitSettings(nil)
}
if rg.RawResourceSettings != nil {
return errors.New("invalid resource group settings, RU mode should not set raw resource settings")
}
case rmpb.GroupMode_RawMode:
if rg.RawResourceSettings == nil {
rg.RawResourceSettings = NewRawResourceSettings(nil, nil, nil)
}
if rg.RUSettings != nil {
return errors.New("invalid resource group settings, raw mode should not set RU settings")
}
default:
return errors.New("invalid resource group mode")
}
return nil
}

// PatchSettings patches the resource group settings.
// Only used to patch the resource group when updating.
// Note: the tokens is the delta value to patch.
func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error {
rg.Lock()
defer rg.Unlock()

if metaGroup.GetMode() != rg.Mode {
return errors.New("only support reconfigure in same mode, maybe you should delete and create a new one")
}
Expand All @@ -138,17 +91,14 @@ func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error {
rg.Priority = metaGroup.Priority
switch rg.Mode {
case rmpb.GroupMode_RUMode:
if metaGroup.GetRUSettings() == nil {
settings := metaGroup.GetRUSettings()
if settings == nil {
return errors.New("invalid resource group settings, RU mode should set RU settings")
}
rg.RUSettings.RU.patch(metaGroup.GetRUSettings().GetRU())
rg.RUSettings.RU.patch(settings.GetRU())
log.Info("patch resource group ru settings", zap.String("name", rg.Name), zap.Any("settings", settings))
case rmpb.GroupMode_RawMode:
if metaGroup.GetRawResourceSettings() == nil {
return errors.New("invalid resource group settings, raw mode should set resource settings")
}
rg.RawResourceSettings.CPU.patch(metaGroup.GetRawResourceSettings().GetCpu())
rg.RawResourceSettings.IOReadBandwidth.patch(metaGroup.GetRawResourceSettings().GetIoRead())
rg.RawResourceSettings.IOWriteBandwidth.patch(metaGroup.GetRawResourceSettings().GetIoWrite())
panic("no implementation")
}
log.Info("patch resource group settings", zap.String("name", rg.Name), zap.String("settings", rg.String()))
return nil
Expand All @@ -163,17 +113,13 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
}
switch group.GetMode() {
case rmpb.GroupMode_RUMode:
if settings := group.GetRUSettings(); settings != nil {
rg.RUSettings = NewRequestUnitSettings(settings.GetRU())
if group.GetRUSettings() == nil {
rg.RUSettings = NewRequestUnitSettings(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause some unexpect behavior?

Copy link
Member Author

@HuSharp HuSharp Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found CheckAndInit has redundant code and function with FromProtoResourceGroup,
So i replace CheckAndInit with FromProtoResourceGroup.

  • for CheckAndInit can keep original logic
  • for other way which call FromProtoResourceGroup, if set nil, just keep it.
    And RequestRU will check it.

So i think it will be ok.

} else {
rg.RUSettings = NewRequestUnitSettings(group.GetRUSettings().GetRU())
}
case rmpb.GroupMode_RawMode:
if settings := group.GetRawResourceSettings(); settings != nil {
rg.RawResourceSettings = NewRawResourceSettings(
settings.GetCpu(),
settings.GetIoRead(),
settings.GetIoWrite(),
)
}
panic("no implementation")
}
return rg
}
Expand All @@ -182,21 +128,23 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
func (rg *ResourceGroup) RequestRU(
now time.Time,
neededTokens float64,
targetPeriodMs uint64,
targetPeriodMs, clientUniqueID uint64,
) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()

if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs)
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID)
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.
func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
rg.RLock()
defer rg.RUnlock()

switch rg.Mode {
case rmpb.GroupMode_RUMode: // RU mode
group := &rmpb.ResourceGroup{
Expand All @@ -209,17 +157,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
}
return group
case rmpb.GroupMode_RawMode: // Raw mode
group := &rmpb.ResourceGroup{
Name: rg.Name,
Mode: rmpb.GroupMode_RawMode,
Priority: rg.Priority,
RawResourceSettings: &rmpb.GroupRawResourceSettings{
Cpu: rg.RawResourceSettings.CPU.GetTokenBucket(),
IoRead: rg.RawResourceSettings.IOReadBandwidth.GetTokenBucket(),
IoWrite: rg.RawResourceSettings.IOWriteBandwidth.GetTokenBucket(),
},
}
return group
panic("no implementation")
}
return nil
}
Expand All @@ -245,19 +183,15 @@ type GroupStates struct {
func (rg *ResourceGroup) GetGroupStates() *GroupStates {
rg.RLock()
defer rg.RUnlock()

switch rg.Mode {
case rmpb.GroupMode_RUMode: // RU mode
tokens := &GroupStates{
RU: rg.RUSettings.RU.GroupTokenBucketState.Clone(),
}
return tokens
case rmpb.GroupMode_RawMode: // Raw mode
tokens := &GroupStates{
CPU: rg.RawResourceSettings.CPU.GroupTokenBucketState.Clone(),
IORead: rg.RawResourceSettings.IOReadBandwidth.GroupTokenBucketState.Clone(),
IOWrite: rg.RawResourceSettings.IOWriteBandwidth.GroupTokenBucketState.Clone(),
}
return tokens
panic("no implementation")
}
return nil
}
Expand All @@ -267,18 +201,11 @@ func (rg *ResourceGroup) SetStatesIntoResourceGroup(states *GroupStates) {
switch rg.Mode {
case rmpb.GroupMode_RUMode:
if state := states.RU; state != nil {
rg.RUSettings.RU.GroupTokenBucketState = *state
rg.RUSettings.RU.setState(state)
log.Debug("update group token bucket state", zap.String("name", rg.Name), zap.Any("state", state))
}
case rmpb.GroupMode_RawMode:
if state := states.CPU; state != nil {
rg.RawResourceSettings.CPU.GroupTokenBucketState = *state
}
if state := states.IORead; state != nil {
rg.RawResourceSettings.IOReadBandwidth.GroupTokenBucketState = *state
}
if state := states.IOWrite; state != nil {
rg.RawResourceSettings.IOWriteBandwidth.GroupTokenBucketState = *state
}
panic("no implementation")
}
}

Expand Down
34 changes: 2 additions & 32 deletions pkg/mcs/resource_manager/server/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (

func TestPatchResourceGroup(t *testing.T) {
re := require.New(t)
rg1 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RUMode, RUSettings: &RequestUnitSettings{}}
err := rg1.CheckAndInit()
re.NoError(err)
rg := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RUMode, RUSettings: NewRequestUnitSettings(nil)}
testCaseRU := []struct {
patchJSONString string
expectJSONString string
Expand All @@ -26,40 +24,12 @@ func TestPatchResourceGroup(t *testing.T) {
}

for _, ca := range testCaseRU {
rg := rg1.Copy()
patch := &rmpb.ResourceGroup{}
err := json.Unmarshal([]byte(ca.patchJSONString), patch)
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg)
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}

rg2 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RawMode, RawResourceSettings: &RawResourceSettings{}}
err = rg2.CheckAndInit()
re.NoError(err)
testCaseResource := []struct {
patchJSONString string
expectJSONString string
}{
{`{"name":"test", "mode":2, "raw_resource_settings": {"cpu":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"settings":{"fill_rate":200000},"state":{"initialized":false}},"io_read_bandwidth":{"state":{"initialized":false}},"io_write_bandwidth":{"state":{"initialized":false}}},"priority":0}`},
{`{"name":"test", "mode":2, "raw_resource_settings": {"io_read":{"settings":{"fill_rate": 200000,"burst_limit":1000000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"state":{"initialized":false}},"io_read_bandwidth":{"settings":{"fill_rate":200000,"burst_limit":1000000},"state":{"initialized":false}},"io_write_bandwidth":{"state":{"initialized":false}}},"priority":0}`},
{`{"name":"test", "mode":2, "raw_resource_settings": {"io_write":{"settings":{"fill_rate": 200000}}}, "priority": 16 }`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"state":{"initialized":false}},"io_read_bandwidth":{"state":{"initialized":false}},"io_write_bandwidth":{"settings":{"fill_rate":200000},"state":{"initialized":false}}},"priority":16}`},
}

for _, ca := range testCaseResource {
rg := rg2.Copy()
patch := &rmpb.ResourceGroup{}
err := json.Unmarshal([]byte(ca.patchJSONString), patch)
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg)
res, err := json.Marshal(rg.Copy())
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}
Expand Down
Loading