-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resource_manager: implement the token-consuming by client #6039
Changes from 38 commits
2c3ed91
b73f7db
0c6ceb2
0b56890
384406e
29fd6a6
3ec7cbf
1cebac6
e72b96e
a2c1568
5a88b30
ea1db71
922c6a7
6ad83df
15a7ad0
23777f6
7466d46
8b8ecdf
dec9006
70481b1
1dca47d
8cd6520
7b83512
3557221
2664000
03be994
8918d1b
0992131
c92ab35
ed6e033
071fd44
da7adf3
19dd323
2699523
654702d
59dbdd0
86024e5
e3c4045
5d73b28
20d084c
a30f6af
b31fffc
ceec274
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,14 +34,12 @@ type ResourceGroup struct { | |
Mode rmpb.GroupMode `json:"mode"` | ||
// RU settings | ||
RUSettings *RequestUnitSettings `json:"r_u_settings,omitempty"` | ||
// raw resource settings | ||
RawResourceSettings *RawResourceSettings `json:"raw_resource_settings,omitempty"` | ||
Priority uint32 `json:"priority"` | ||
Priority uint32 `json:"priority"` | ||
} | ||
|
||
// RequestUnitSettings is the definition of the RU settings. | ||
type RequestUnitSettings struct { | ||
RU GroupTokenBucket `json:"r_u,omitempty"` | ||
RU *GroupTokenBucket `json:"r_u,omitempty"` | ||
} | ||
|
||
// NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket. | ||
|
@@ -51,22 +49,6 @@ func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings | |
} | ||
} | ||
|
||
// RawResourceSettings is the definition of the native resource settings. | ||
type RawResourceSettings struct { | ||
CPU GroupTokenBucket `json:"cpu,omitempty"` | ||
IOReadBandwidth GroupTokenBucket `json:"io_read_bandwidth,omitempty"` | ||
IOWriteBandwidth GroupTokenBucket `json:"io_write_bandwidth,omitempty"` | ||
} | ||
|
||
// NewRawResourceSettings creates a new RawResourceSettings with the given token buckets. | ||
func NewRawResourceSettings(cpu, ioRead, ioWrite *rmpb.TokenBucket) *RawResourceSettings { | ||
return &RawResourceSettings{ | ||
CPU: NewGroupTokenBucket(cpu), | ||
IOReadBandwidth: NewGroupTokenBucket(ioRead), | ||
IOWriteBandwidth: NewGroupTokenBucket(ioWrite), | ||
} | ||
} | ||
|
||
func (rg *ResourceGroup) String() string { | ||
res, err := json.Marshal(rg) | ||
if err != nil { | ||
|
@@ -93,42 +75,13 @@ func (rg *ResourceGroup) Copy() *ResourceGroup { | |
return &newRG | ||
} | ||
|
||
// CheckAndInit checks the validity of the resource group and initializes the default values if not setting. | ||
// Only used to initialize the resource group when creating. | ||
func (rg *ResourceGroup) CheckAndInit() error { | ||
if len(rg.Name) == 0 || len(rg.Name) > 32 { | ||
return errors.New("invalid resource group name, the length should be in [1,32]") | ||
} | ||
if rg.Priority > 16 { | ||
return errors.New("invalid resource group priority, the value should be in [0,16]") | ||
} | ||
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: | ||
if rg.RUSettings == nil { | ||
rg.RUSettings = NewRequestUnitSettings(nil) | ||
} | ||
if rg.RawResourceSettings != nil { | ||
return errors.New("invalid resource group settings, RU mode should not set raw resource settings") | ||
} | ||
case rmpb.GroupMode_RawMode: | ||
if rg.RawResourceSettings == nil { | ||
rg.RawResourceSettings = NewRawResourceSettings(nil, nil, nil) | ||
} | ||
if rg.RUSettings != nil { | ||
return errors.New("invalid resource group settings, raw mode should not set RU settings") | ||
} | ||
default: | ||
return errors.New("invalid resource group mode") | ||
} | ||
return nil | ||
} | ||
|
||
// PatchSettings patches the resource group settings. | ||
// Only used to patch the resource group when updating. | ||
// Note: the tokens is the delta value to patch. | ||
func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error { | ||
rg.Lock() | ||
defer rg.Unlock() | ||
|
||
if metaGroup.GetMode() != rg.Mode { | ||
return errors.New("only support reconfigure in same mode, maybe you should delete and create a new one") | ||
} | ||
|
@@ -138,17 +91,14 @@ func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error { | |
rg.Priority = metaGroup.Priority | ||
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: | ||
if metaGroup.GetRUSettings() == nil { | ||
settings := metaGroup.GetRUSettings() | ||
if settings == nil { | ||
return errors.New("invalid resource group settings, RU mode should set RU settings") | ||
} | ||
rg.RUSettings.RU.patch(metaGroup.GetRUSettings().GetRU()) | ||
rg.RUSettings.RU.patch(settings.GetRU()) | ||
log.Info("patch resource group ru settings", zap.String("name", rg.Name), zap.Any("settings", settings)) | ||
case rmpb.GroupMode_RawMode: | ||
if metaGroup.GetRawResourceSettings() == nil { | ||
return errors.New("invalid resource group settings, raw mode should set resource settings") | ||
} | ||
rg.RawResourceSettings.CPU.patch(metaGroup.GetRawResourceSettings().GetCpu()) | ||
rg.RawResourceSettings.IOReadBandwidth.patch(metaGroup.GetRawResourceSettings().GetIoRead()) | ||
rg.RawResourceSettings.IOWriteBandwidth.patch(metaGroup.GetRawResourceSettings().GetIoWrite()) | ||
panic("no implementation") | ||
} | ||
log.Info("patch resource group settings", zap.String("name", rg.Name), zap.String("settings", rg.String())) | ||
return nil | ||
|
@@ -163,17 +113,13 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { | |
} | ||
switch group.GetMode() { | ||
case rmpb.GroupMode_RUMode: | ||
if settings := group.GetRUSettings(); settings != nil { | ||
rg.RUSettings = NewRequestUnitSettings(settings.GetRU()) | ||
if group.GetRUSettings() == nil { | ||
rg.RUSettings = NewRequestUnitSettings(nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this cause some unexpect behavior? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found CheckAndInit has redundant code and function with FromProtoResourceGroup,
So i think it will be ok. |
||
} else { | ||
rg.RUSettings = NewRequestUnitSettings(group.GetRUSettings().GetRU()) | ||
} | ||
case rmpb.GroupMode_RawMode: | ||
if settings := group.GetRawResourceSettings(); settings != nil { | ||
rg.RawResourceSettings = NewRawResourceSettings( | ||
settings.GetCpu(), | ||
settings.GetIoRead(), | ||
settings.GetIoWrite(), | ||
) | ||
} | ||
panic("no implementation") | ||
} | ||
return rg | ||
} | ||
|
@@ -182,21 +128,23 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { | |
func (rg *ResourceGroup) RequestRU( | ||
now time.Time, | ||
neededTokens float64, | ||
targetPeriodMs uint64, | ||
targetPeriodMs, clientUniqueID uint64, | ||
) *rmpb.GrantedRUTokenBucket { | ||
rg.Lock() | ||
defer rg.Unlock() | ||
|
||
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil { | ||
return nil | ||
} | ||
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs) | ||
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID) | ||
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} | ||
} | ||
|
||
// IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup. | ||
func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup { | ||
rg.RLock() | ||
defer rg.RUnlock() | ||
|
||
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: // RU mode | ||
group := &rmpb.ResourceGroup{ | ||
|
@@ -209,17 +157,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup { | |
} | ||
return group | ||
case rmpb.GroupMode_RawMode: // Raw mode | ||
group := &rmpb.ResourceGroup{ | ||
Name: rg.Name, | ||
Mode: rmpb.GroupMode_RawMode, | ||
Priority: rg.Priority, | ||
RawResourceSettings: &rmpb.GroupRawResourceSettings{ | ||
Cpu: rg.RawResourceSettings.CPU.GetTokenBucket(), | ||
IoRead: rg.RawResourceSettings.IOReadBandwidth.GetTokenBucket(), | ||
IoWrite: rg.RawResourceSettings.IOWriteBandwidth.GetTokenBucket(), | ||
}, | ||
} | ||
return group | ||
panic("no implementation") | ||
} | ||
return nil | ||
} | ||
|
@@ -245,19 +183,15 @@ type GroupStates struct { | |
func (rg *ResourceGroup) GetGroupStates() *GroupStates { | ||
rg.RLock() | ||
defer rg.RUnlock() | ||
|
||
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: // RU mode | ||
tokens := &GroupStates{ | ||
RU: rg.RUSettings.RU.GroupTokenBucketState.Clone(), | ||
} | ||
return tokens | ||
case rmpb.GroupMode_RawMode: // Raw mode | ||
tokens := &GroupStates{ | ||
CPU: rg.RawResourceSettings.CPU.GroupTokenBucketState.Clone(), | ||
IORead: rg.RawResourceSettings.IOReadBandwidth.GroupTokenBucketState.Clone(), | ||
IOWrite: rg.RawResourceSettings.IOWriteBandwidth.GroupTokenBucketState.Clone(), | ||
} | ||
return tokens | ||
panic("no implementation") | ||
} | ||
return nil | ||
} | ||
|
@@ -267,18 +201,11 @@ func (rg *ResourceGroup) SetStatesIntoResourceGroup(states *GroupStates) { | |
switch rg.Mode { | ||
case rmpb.GroupMode_RUMode: | ||
if state := states.RU; state != nil { | ||
rg.RUSettings.RU.GroupTokenBucketState = *state | ||
rg.RUSettings.RU.setState(state) | ||
log.Debug("update group token bucket state", zap.String("name", rg.Name), zap.Any("state", state)) | ||
} | ||
case rmpb.GroupMode_RawMode: | ||
if state := states.CPU; state != nil { | ||
rg.RawResourceSettings.CPU.GroupTokenBucketState = *state | ||
} | ||
if state := states.IORead; state != nil { | ||
rg.RawResourceSettings.IOReadBandwidth.GroupTokenBucketState = *state | ||
} | ||
if state := states.IOWrite; state != nil { | ||
rg.RawResourceSettings.IOWriteBandwidth.GroupTokenBucketState = *state | ||
} | ||
panic("no implementation") | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check the name valid at the top.