Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: implement the token-consuming by client #6039

Merged
merged 43 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c3ed91
roughly impl
HuSharp Feb 23, 2023
b73f7db
make check happy
HuSharp Feb 23, 2023
0c6ceb2
make check happy
HuSharp Feb 23, 2023
0b56890
move sync
HuSharp Feb 24, 2023
384406e
add client id
HuSharp Feb 27, 2023
29fd6a6
Merge branch 'master' into token_client_limit
HuSharp Feb 28, 2023
3ec7cbf
update config
HuSharp Feb 28, 2023
1cebac6
remove stale slot
HuSharp Mar 1, 2023
e72b96e
merge master
HuSharp Mar 2, 2023
a2c1568
address comment
HuSharp Mar 8, 2023
5a88b30
Merge branch 'master' into token_client_limit
HuSharp Mar 8, 2023
ea1db71
merge upstream
HuSharp Mar 10, 2023
922c6a7
make ci happy
HuSharp Mar 10, 2023
6ad83df
Merge branch 'master' into token_client_limit
HuSharp Mar 13, 2023
15a7ad0
update cal
HuSharp Mar 13, 2023
23777f6
Merge branch 'master' into token_client_limit
HuSharp Mar 13, 2023
7466d46
update lock
HuSharp Mar 13, 2023
8b8ecdf
Merge branch 'master' into token_client_limit
HuSharp Mar 14, 2023
dec9006
address comment and update impl
HuSharp Mar 15, 2023
70481b1
update comment
HuSharp Mar 16, 2023
1dca47d
needTokens amplification
HuSharp Mar 16, 2023
8cd6520
merge upstream
HuSharp Mar 16, 2023
7b83512
make check happy
HuSharp Mar 16, 2023
3557221
merge upstream
HuSharp Mar 16, 2023
2664000
Merge branch 'master' into token_client_limit
HuSharp Mar 17, 2023
03be994
revert buffer
HuSharp Mar 17, 2023
8918d1b
add comment
HuSharp Mar 20, 2023
0992131
merge upstream
HuSharp Mar 20, 2023
c92ab35
merge upstream
HuSharp Mar 20, 2023
ed6e033
merge upstream
HuSharp Mar 21, 2023
071fd44
reserve last burst
HuSharp Mar 21, 2023
da7adf3
reserve last burst
HuSharp Mar 21, 2023
19dd323
remove cleanup
HuSharp Mar 21, 2023
2699523
address comment
HuSharp Mar 21, 2023
654702d
Merge branch 'master' into token_client_limit
HuSharp Mar 21, 2023
59dbdd0
address comment
HuSharp Mar 22, 2023
86024e5
update ratio to 0.5
HuSharp Mar 22, 2023
e3c4045
update test with ratio
HuSharp Mar 22, 2023
5d73b28
update test with ratio
HuSharp Mar 22, 2023
20d084c
Merge branch 'master' into token_client_limit
ti-chi-bot Mar 23, 2023
a30f6af
make check happy
HuSharp Mar 23, 2023
b31fffc
remove error
HuSharp Mar 23, 2023
ceec274
remove error
HuSharp Mar 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comment
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Mar 8, 2023
commit a2c15685c62ba73eebe9269a8e1a98b018e831d7
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (gc *groupCostController) handleRUTokenResponse(resp *rmpb.TokenBucketRespo
}

func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket *rmpb.TokenBucket, trickleTimeMs int64) {
granted := bucket.Tokens
granted := bucket.GetTokens()
if !counter.lastDeadline.IsZero() {
// If last request came with a trickle duration, we may have RUs that were
// not made available to the bucket yet; throw them together with the newly
Expand Down
69 changes: 38 additions & 31 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type TokenSlot struct {
// assignTokens is the number of tokens in the slot.
assignTokens float64
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
lastAssignTokens float64
assignTokensSum float64
requireTokensSum float64
lastUpdate *time.Time
}

Expand All @@ -79,8 +79,8 @@ type GroupTokenBucketState struct {
mu sync.Mutex
Tokens float64 `json:"tokens,omitempty"`
// ClientUniqueID -> TokenSlot
tokenSlots map[uint64]*TokenSlot
assignTokensSum float64
tokenSlots map[uint64]*TokenSlot
clientConsumptionTokensSum float64

LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
Expand All @@ -100,46 +100,53 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
lastUpdate = &newLastUpdate
}
return &GroupTokenBucketState{
Tokens: gts.Tokens,
LastUpdate: lastUpdate,
Initialized: gts.Initialized,
tokenSlots: tokenSlots,
assignTokensSum: gts.assignTokensSum,
Tokens: gts.Tokens,
LastUpdate: lastUpdate,
Initialized: gts.Initialized,
tokenSlots: tokenSlots,
clientConsumptionTokensSum: gts.clientConsumptionTokensSum,
}
}

func (gts *GroupTokenBucketState) cleanupAssignTokenSum() {
gts.assignTokensSum = 0
gts.clientConsumptionTokensSum = 0
for _, slot := range gts.tokenSlots {
slot.assignTokensSum = 0
slot.requireTokensSum = 0
}
}

func (gts *GroupTokenBucketState) balanceSlotTokens(
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
clientUniqueID uint64,
settings *rmpb.TokenLimitSettings) {
settings *rmpb.TokenLimitSettings,
consumptionToken float64) {
now := time.Now()
if slot, ok := gts.tokenSlots[clientUniqueID]; !ok {
gts.cleanupAssignTokenSum()
gts.tokenSlots[clientUniqueID] = &TokenSlot{lastUpdate: &now}
} else {
slot.lastUpdate = &now
slot.requireTokensSum += consumptionToken
gts.clientConsumptionTokensSum += consumptionToken

if gts.clientConsumptionTokensSum >= maxAssignTokens {
gts.cleanupAssignTokenSum()
}
}

evenRatio := 1 / float64(len(gts.tokenSlots))
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
if settings.GetBurstLimit() < 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this check at the very first beginning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the first thing we need to check is: whether the client is new and whether exceeds the max limit for token size
and then check the burst limit to even fill rate.
If we put this check at the beginning when the client is new need to even again

HuSharp marked this conversation as resolved.
Show resolved Hide resolved
for _, slot := range gts.tokenSlots {
slot.settings = &rmpb.TokenLimitSettings{
FillRate: settings.GetFillRate(),
FillRate: uint64(float64(settings.GetFillRate()) * evenRatio),
BurstLimit: settings.GetBurstLimit(),
}
}
return
}

evenRatio := 1 / float64(len(gts.tokenSlots))
retryLoop:
for clientID, slot := range gts.tokenSlots {
// Clean up those slot that have not been used for a long time.
// Clean up those slots that have not been used for a long time.
if clientID != clientUniqueID && now.Sub(*slot.lastUpdate) >= defaultSlotStalePeriod {
delete(gts.tokenSlots, clientID)
evenRatio = 1 / float64(len(gts.tokenSlots))
Expand All @@ -148,29 +155,22 @@ retryLoop:
}

var ratio float64
if gts.assignTokensSum == 0 || len(gts.tokenSlots) == 1 {
if gts.clientConsumptionTokensSum == 0 || len(gts.tokenSlots) == 1 {
ratio = evenRatio
} else {
ratio = (slot.assignTokensSum/gts.assignTokensSum + evenRatio) / 2
ratio = (slot.requireTokensSum/gts.clientConsumptionTokensSum + evenRatio) / 2
}
var (
fillRate = settings.GetFillRate() * uint64(ratio)
burstLimit = settings.GetBurstLimit() * int64(ratio)
fillRate = float64(settings.GetFillRate()) * ratio
burstLimit = float64(settings.GetBurstLimit()) * ratio
assignToken = gts.Tokens * ratio
)

slot.assignTokens = assignToken
slot.lastAssignTokens = assignToken
slot.settings = &rmpb.TokenLimitSettings{
FillRate: fillRate,
BurstLimit: burstLimit,
}
// update assign token sum
slot.assignTokensSum += assignToken
gts.assignTokensSum += assignToken
if gts.assignTokensSum >= maxAssignTokens {
gts.cleanupAssignTokenSum()
continue retryLoop
FillRate: uint64(fillRate),
BurstLimit: int64(burstLimit),
}
}
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (gtb *GroupTokenBucket) init(now time.Time) {
}

// updateTokens updates the tokens and settings.
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64) {
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, consumptionToken float64) {
gtb.mu.Lock()
defer gtb.mu.Unlock()

Expand All @@ -237,17 +237,24 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
gtb.Tokens += float64(gtb.Settings.GetFillRate()) * delta.Seconds()
gtb.LastUpdate = &now
}
// Reloan when setting changed
if gtb.settingChanged && gtb.Tokens <= 0 {
gtb.Tokens = 0
}
if burst := float64(burstLimit); burst != 0 && gtb.Tokens > burst {
gtb.Tokens = burst
}
// Balance each slots.
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings)
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken)
}

// request requests tokens from the group token bucket.
func (gtb *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPeriodMs, clientUniqueID uint64) (*rmpb.TokenBucket, int64) {
func (gtb *GroupTokenBucket) request(now time.Time,
neededTokens float64,
targetPeriodMs, clientUniqueID uint64,
) (*rmpb.TokenBucket, int64) {
// Update tokens
gtb.updateTokens(now, gtb.Settings.GetBurstLimit(), clientUniqueID)
gtb.updateTokens(now, gtb.Settings.GetBurstLimit(), clientUniqueID, neededTokens)
// Serve by specific client
slot, ok := gtb.tokenSlots[clientUniqueID]
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
RequestRU: requests,
},
},
ConsumptionSinceLastRequest: &rmpb.Consumption{
RRU: float64(i * 100),
WRU: float64(i * 200),
},
}
reqs.Requests = append(reqs.Requests, req)
}
Expand Down