Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

merge

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Jan 12, 2023
1 parent 4ffccbc commit 9141699
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 265 deletions.
65 changes: 10 additions & 55 deletions pkg/mcs/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type ResourceGroupProvider interface {
func NewResourceGroupController(
clientUniqueId uint64,
provider ResourceGroupProvider,
requestUnitConfig *RequestUnitConfig,
) (*resourceGroupsController, error) {
return newResourceGroupController(clientUniqueId, provider)
return newResourceGroupController(clientUniqueId, provider, requestUnitConfig)
}

var _ ResourceGroupKVInterceptor = (*resourceGroupsController)(nil)
Expand Down Expand Up @@ -86,12 +87,17 @@ type resourceGroupsController struct {
}
}

func newResourceGroupController(clientUniqueId uint64, provider ResourceGroupProvider) (*resourceGroupsController, error) {
config := DefaultConfig()
func newResourceGroupController(clientUniqueId uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig) (*resourceGroupsController, error) {
var config *Config
if requestUnitConfig != nil {
config = generateConfig(requestUnitConfig)
} else {
config = DefaultConfig()
}
return &resourceGroupsController{
clientUniqueId: clientUniqueId,
provider: provider,
config: DefaultConfig(),
config: config,
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
calculators: []ResourceCalculator{newKVCalculator(config), newSQLLayerCPUCalculateor(config)},
Expand Down Expand Up @@ -476,11 +482,7 @@ func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context)
case <-counter.setupNotificationCh:
counter.setupNotificationTimer = nil
counter.setupNotificationCh = nil
<<<<<<< HEAD:pkg/mcs/resource_manager/client/client.go
counter.limiter.SetupNotificationAt(gc.run.now, float64(counter.setupNotificationThreshold))
=======
counter.limiter.SetupNotification(gc.run.now, counter.setupNotificationThreshold)
>>>>>>> 49a78a80a7ba30505845094295fe5e3f2a802cf0:pkg/mcs/resource_manager/tenant_client/client.go
gc.updateRunState(ctx)
default:
}
Expand All @@ -491,11 +493,7 @@ func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context)
case <-counter.setupNotificationCh:
counter.setupNotificationTimer = nil
counter.setupNotificationCh = nil
<<<<<<< HEAD:pkg/mcs/resource_manager/client/client.go
counter.limiter.SetupNotificationAt(gc.run.now, float64(counter.setupNotificationThreshold))
=======
counter.limiter.SetupNotification(gc.run.now, counter.setupNotificationThreshold)
>>>>>>> 49a78a80a7ba30505845094295fe5e3f2a802cf0:pkg/mcs/resource_manager/tenant_client/client.go
gc.updateRunState(ctx)
default:
}
Expand Down Expand Up @@ -765,61 +763,18 @@ func (gc *groupCostController) OnResponse(ctx context.Context, req RequestInfo,
switch gc.mode {
case rmpb.GroupMode_RawMode:
for typ, counter := range gc.run.resourceTokens {
<<<<<<< HEAD:pkg/mcs/resource_manager/client/client.go
if v := GetResourceValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), float64(v))
=======
v, ok := deltaResource[typ]
if ok {
counter.limiter.RemoveTokens(time.Now(), v)
>>>>>>> 49a78a80a7ba30505845094295fe5e3f2a802cf0:pkg/mcs/resource_manager/tenant_client/client.go
}
}
case rmpb.GroupMode_RUMode:
for typ, counter := range gc.run.requestUnitTokens {
<<<<<<< HEAD:pkg/mcs/resource_manager/client/client.go
if v := GetRUValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), float64(v))
=======
v, ok := deltaRequestUnit[typ]
if ok {
counter.limiter.RemoveTokens(time.Now(), v)
>>>>>>> 49a78a80a7ba30505845094295fe5e3f2a802cf0:pkg/mcs/resource_manager/tenant_client/client.go
}
}
}
<<<<<<< HEAD:pkg/mcs/resource_manager/client/client.go
gc.mu.Lock()
Add(gc.mu.consumption, delta)
gc.mu.Unlock()
=======
}

func (c *resourceGroupsController) addDemoResourceGroup(ctx context.Context) error {
setting := &rmpb.GroupSettings{
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: &rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
Fillrate: 2000,
BurstLimit: 20000000,
},
},
WRU: &rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
Fillrate: 20000,
BurstLimit: 2000000,
},
},
},
}
context, err := c.provider.AddResourceGroup(ctx, "demo", setting)
if err != nil {
return err
}
log.Info("add resource group", zap.String("resp", context), zap.Any("setting", setting))
return err
>>>>>>> 49a78a80a7ba30505845094295fe5e3f2a802cf0:pkg/mcs/resource_manager/tenant_client/client.go
}
9 changes: 4 additions & 5 deletions pkg/mcs/resource_manager/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
)

var (
ruLen = len(rmpb.RequestUnitType_name)
resourceLen = len(rmpb.ResourceType_name)
requestUnitList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{
rmpb.RequestUnitType_RRU: {},
rmpb.RequestUnitType_WRU: {},
Expand Down Expand Up @@ -108,17 +106,18 @@ func DefaultConfig() *Config {
cfg := generateConfig(
DefaultRequestUnitConfig(),
)
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
return cfg
}

func generateConfig(ruConfig *RequestUnitConfig) *Config {
return &Config{
cfg := &Config{
ReadBaseCost: RequestUnit(ruConfig.ReadBaseCost),
ReadBytesCost: RequestUnit(ruConfig.ReadCostPerByte),
WriteBaseCost: RequestUnit(ruConfig.WriteBaseCost),
WriteBytesCost: RequestUnit(ruConfig.WriteCostPerByte),
WriteCPUMsCost: RequestUnit(ruConfig.WriteCPUMsCost),
}
cfg.groupLoopUpdateInterval = defaultGroupLoopUpdateInterval
cfg.targetPeriod = defaultTargetPeriod
return cfg
}
87 changes: 0 additions & 87 deletions pkg/mcs/resource_manager/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,93 +76,6 @@ func Sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.KvWriteRpcCount -= custom1.KvWriteRpcCount
}

// type ResourceCalculator interface {
// Trickle(map[rmpb.ResourceType]float64, map[rmpb.RequestUnitType]float64, *rmpb.Consumption, context.Context)
// BeforeKVRequest(map[rmpb.ResourceType]float64, map[rmpb.RequestUnitType]float64, *rmpb.Consumption, RequestInfo)
// AfterKVRequest(map[rmpb.ResourceType]float64, map[rmpb.RequestUnitType]float64, *rmpb.Consumption, RequestInfo, ResponseInfo)
// }

// type KVCalculator struct {
// *Config
// }

// func newKVCalculator(cfg *Config) *KVCalculator {
// return &KVCalculator{Config: cfg}
// }

// func (dwc *KVCalculator) Trickle(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, ctx context.Context) {
// }

// func (dwc *KVCalculator) BeforeKVRequest(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, req RequestInfo) {
// if req.IsWrite() {
// writeBytes := float64(req.WriteBytes())
// // for resource
// resource[rmpb.ResourceType_IOWriteFlow] += writeBytes
// // for RU
// wru := float64(dwc.WriteBytesCost) * writeBytes
// ru[rmpb.RequestUnitType_WRU] += wru
// // for consumption
// consumption.KvWriteRpcCount += 1
// consumption.WRU += wru
// consumption.WriteBytes += writeBytes

// } else {
// // none for resource
// // none for RU
// // for consumption
// consumption.KvReadRpcCount += 1
// }
// }
// func (dwc *KVCalculator) AfterKVRequest(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) {
// readBytes := float64(res.ReadBytes())
// kvCPUms := float64(res.KVCPUms())
// // for resource
// resource[rmpb.ResourceType_IOReadFlow] += readBytes
// resource[rmpb.ResourceType_CPU] += kvCPUms
// // for RU
// ru_io := readBytes * float64(dwc.ReadBytesCost)
// ru_cpu := kvCPUms * float64(dwc.KVCPUMsCost)
// ru[rmpb.RequestUnitType_RRU] += ru_cpu + ru_io
// // for consumption
// consumption.RRU += ru_cpu + ru_io
// consumption.ReadBytes += readBytes
// consumption.TotalCpuTimeMs += kvCPUms
// }

// type SQLLayerCPUCalculateor struct {
// *Config
// }

// func newSQLLayerCPUCalculateor(cfg *Config) *SQLLayerCPUCalculateor {
// return &SQLLayerCPUCalculateor{Config: cfg}
// }

// func (dsc *SQLLayerCPUCalculateor) Trickle(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, ctx context.Context) {
// // TODO: SQL Layer RU/resource custom
// cpuFunc := func(ctx context.Context) float64 {
// return 0.
// }
// cpu := cpuFunc(ctx)
// // for resource
// resource[rmpb.ResourceType_CPU] += cpu
// // for RU
// ru_cpu := cpu * float64(dsc.SQLCPUSecondCost)
// // TODO: SQL Layer RU/resource custom type
// ru[rmpb.RequestUnitType_RRU] += ru_cpu / 2
// ru[rmpb.RequestUnitType_WRU] += ru_cpu / 2
// // for consumption
// // TODO: SQL Layer RU/resource custom type
// consumption.RRU += ru_cpu / 2
// consumption.RRU += ru_cpu / 2
// consumption.TotalCpuTimeMs += cpu
// consumption.SqlLayerCpuTimeMs += cpu
// }

// func (dsc *SQLLayerCPUCalculateor) BeforeKVRequest(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, req RequestInfo) {
// }
// func (dsc *SQLLayerCPUCalculateor) AfterKVRequest(resource map[rmpb.ResourceType]float64, ru map[rmpb.RequestUnitType]float64, consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) {
// }

type ResourceCalculator interface {
Trickle(*rmpb.Consumption, context.Context)
BeforeKVRequest(*rmpb.Consumption, RequestInfo)
Expand Down
118 changes: 0 additions & 118 deletions pkg/mcs/resource_manager/tenant_client/model.go

This file was deleted.

0 comments on commit 9141699

Please sign in to comment.