Skip to content

Commit

Permalink
client: Count resource penalty for resource control (#6336)
Browse files Browse the repository at this point in the history
close #6335

Count the delta resource consumption of the resource group that have completed at all stores between the previous request to this store and current request.

Signed-off-by: Connor1996 <zbk602423539@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Connor1996 and ti-chi-bot authored Apr 19, 2023
1 parent 1c360b6 commit 9e7b8e8
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 28 deletions.
47 changes: 40 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}
Expand Down Expand Up @@ -404,11 +404,11 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, err
return nil, nil, err
}
return gc.onRequestWait(ctx, info)
}
Expand Down Expand Up @@ -440,7 +440,9 @@ type groupCostController struct {

mu struct {
sync.Mutex
consumption *rmpb.Consumption
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
globalCounter *rmpb.Consumption
}

// fast path to make once token limit with un-limit burst.
Expand Down Expand Up @@ -548,6 +550,8 @@ func newGroupCostController(
}

gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
return gc, nil
}

Expand Down Expand Up @@ -991,14 +995,16 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}

gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()

if !gc.burstable.Load() {
var err error
now := time.Now()
Expand Down Expand Up @@ -1036,12 +1042,27 @@ func (gc *groupCostController) onRequestWait(
gc.mu.Lock()
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
return nil, err
return nil, nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
}
}
return delta, nil

gc.mu.Lock()
// Calculate the penalty of the store
penalty := &rmpb.Consumption{}
if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist {
*penalty = *gc.mu.globalCounter
sub(penalty, storeCounter)
} else {
gc.mu.storeCounter[info.StoreID()] = &rmpb.Consumption{}
}
// More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty.
// So here resets it directly as failure is rare.
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, nil
}

func (gc *groupCostController) onResponse(
Expand All @@ -1067,9 +1088,21 @@ func (gc *groupCostController) onResponse(
}
}
}

gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
add(gc.mu.storeCounter[req.StoreID()], count)
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

return delta, nil
}

Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type RequestUnit float64
type RequestInfo interface {
IsWrite() bool
WriteBytes() uint64
StoreID() uint64
}

// ResponseInfo is the interface of the response information provider. A response should be
Expand Down
9 changes: 8 additions & 1 deletion client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import "time"
type TestRequestInfo struct {
isWrite bool
writeBytes uint64
storeID uint64
}

// NewTestRequestInfo creates a new TestRequestInfo.
func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo {
func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo {
return &TestRequestInfo{
isWrite: isWrite,
writeBytes: writeBytes,
storeID: storeID,
}
}

Expand All @@ -44,6 +46,11 @@ func (tri *TestRequestInfo) WriteBytes() uint64 {
return tri.writeBytes
}

// StoreID implements the RequestInfo interface.
func (tri *TestRequestInfo) StoreID() uint64 {
return tri.storeID
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
Expand Down
123 changes: 104 additions & 19 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (suite *resourceManagerClientTestSuite) TearDownSuite() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/enableDegradedMode"))
}

func (suite *resourceManagerClientTestSuite) TearDownTest() {
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) cleanupResourceGroups() {
cli := suite.client
groups, err := cli.ListResourceGroups(suite.ctx)
Expand Down Expand Up @@ -258,11 +262,11 @@ type tokenConsumptionPerSecond struct {
}

func (t tokenConsumptionPerSecond) makeReadRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(false, 0)
return controller.NewTestRequestInfo(false, 0, 0)
}

func (t tokenConsumptionPerSecond) makeWriteRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1))
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1), 0)
}

func (t tokenConsumptionPerSecond) makeReadResponse() *controller.TestResponseInfo {
Expand Down Expand Up @@ -340,9 +344,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
Expand All @@ -356,7 +360,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
break
}
}
suite.cleanupResourceGroups()
controller.Stop()
}

Expand Down Expand Up @@ -408,9 +411,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -447,9 +450,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
Expand All @@ -467,32 +470,120 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
resourceGroupName2 := suite.initGroups[2].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
re.NoError(err)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)"))
resourceGroupName3 := suite.initGroups[3].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0}
wreq = tcs.makeWriteRequest()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
re.NoError(err)
time.Sleep(110 * time.Millisecond)
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0}
duration := time.Duration(0)
for i := 0; i < tcs.times; i++ {
wreq = tcs.makeWriteRequest()
startTime := time.Now()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
duration += time.Since(startTime)
re.NoError(err)
}
re.Less(duration, 100*time.Millisecond)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend"))
suite.cleanupResourceGroups()
controller.Stop()
}

func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
WriteBaseCost: 1,
WriteCostPerByte: 1,
CPUMsCost: 1,
}
c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// failed request, shouldn't be counted in penalty
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// from same store, should be zero
req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req1, resp1)
re.NoError(err)

// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(60))
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6)
_, err = c.OnResponse(resourceGroupName, req2, resp2)
re.NoError(err)

// from new store, should be zero
req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req3, resp3)
re.NoError(err)

// from different group, should be zero
resourceGroupName = suite.initGroups[2].Name
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req4, resp4)
re.NoError(err)
}

func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re := suite.Require()
cli := suite.client
Expand Down Expand Up @@ -557,7 +648,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
checkFunc(gresp, groups[0])
}
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist"))
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
Expand Down Expand Up @@ -813,9 +903,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover()
re.NotNil(getResp)
re.Equal(group.RUSettings.RU.Settings.FillRate, getResp.RUSettings.RU.Settings.FillRate)
}

// Cleanup the resource group.
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMode() {
Expand Down Expand Up @@ -874,7 +961,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
controller.Stop()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/acquireFailed"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/degradedModeRU"))
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() {
Expand Down Expand Up @@ -956,5 +1042,4 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {

re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup"))
controller.Stop()
suite.cleanupResourceGroups()
}

0 comments on commit 9e7b8e8

Please sign in to comment.