Skip to content

Commit fd9fc16

Browse files
authored
Resource monitor: fix bugs + add tests (#6717)
* Fix bugs + add tests Signed-off-by: Justin Jung <jungjust@amazon.com> * Changelog Signed-off-by: Justin Jung <jungjust@amazon.com> * Add integration test Signed-off-by: Justin Jung <jungjust@amazon.com> * Fix tests Signed-off-by: Justin Jung <jungjust@amazon.com> --------- Signed-off-by: Justin Jung <jungjust@amazon.com>
1 parent 0e85ae0 commit fd9fc16

File tree

9 files changed

+116
-9
lines changed

9 files changed

+116
-9
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/cortexproject/cortex/integration/e2e"
12+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
13+
"github.com/cortexproject/cortex/integration/e2ecortex"
14+
)
15+
16+
func Test_ResourceBasedLimiter_shouldStartWithoutError(t *testing.T) {
17+
s, err := e2e.NewScenario(networkName)
18+
require.NoError(t, err)
19+
defer s.Close()
20+
21+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
22+
"-monitored.resources": "cpu,heap",
23+
})
24+
25+
// Start dependencies.
26+
consul := e2edb.NewConsul()
27+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
28+
require.NoError(t, s.StartAndWaitReady(consul, minio))
29+
30+
// Start Cortex components.
31+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
32+
"-ingester.instance-limits.cpu-utilization": "0.8",
33+
"-ingester.instance-limits.heap-utilization": "0.8",
34+
}), "")
35+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
36+
"-store-gateway.instance-limits.cpu-utilization": "0.8",
37+
"-store-gateway.instance-limits.heap-utilization": "0.8",
38+
}), "")
39+
require.NoError(t, s.StartAndWaitReady(ingester, storeGateway))
40+
}

pkg/cortex/modules.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -770,26 +770,26 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
770770
}
771771

772772
func (t *Cortex) initResourceMonitor() (services.Service, error) {
773-
if len(t.Cfg.MonitoredResources) == 0 {
773+
if t.Cfg.MonitoredResources.String() == "" || len(t.Cfg.MonitoredResources) == 0 {
774774
return nil, nil
775775
}
776776

777+
util_log.WarnExperimentalUse(fmt.Sprintf("resource monitor for [%s]", t.Cfg.MonitoredResources.String()))
778+
777779
containerLimits := make(map[resource.Type]float64)
778780
for _, res := range t.Cfg.MonitoredResources {
779781
switch resource.Type(res) {
780782
case resource.CPU:
781783
containerLimits[resource.Type(res)] = float64(runtime.GOMAXPROCS(0))
782784
case resource.Heap:
783785
containerLimits[resource.Type(res)] = float64(debug.SetMemoryLimit(-1))
786+
default:
787+
return nil, fmt.Errorf("unknown resource type: %s", res)
784788
}
785789
}
786790

787791
var err error
788792
t.ResourceMonitor, err = resource.NewMonitor(containerLimits, prometheus.DefaultRegisterer)
789-
if t.ResourceMonitor != nil {
790-
util_log.WarnExperimentalUse("resource monitor")
791-
}
792-
793793
return t.ResourceMonitor, err
794794
}
795795

@@ -798,7 +798,7 @@ func (t *Cortex) setupModuleManager() error {
798798

799799
// Register all modules here.
800800
// RegisterModule(name string, initFn func()(services.Service, error))
801-
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor)
801+
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor, modules.UserInvisibleModule)
802802
mm.RegisterModule(Server, t.initServer, modules.UserInvisibleModule)
803803
mm.RegisterModule(API, t.initAPI, modules.UserInvisibleModule)
804804
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)

pkg/cortex/modules_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,16 @@ func Test_setupModuleManager(t *testing.T) {
232232
}
233233
}
234234
}
235+
236+
func Test_initResourceMonitor_shouldFailOnInvalidResource(t *testing.T) {
237+
cortex := &Cortex{
238+
Server: &server.Server{},
239+
Cfg: Config{
240+
MonitoredResources: []string{"invalid"},
241+
},
242+
}
243+
244+
// log warning message and spin up other cortex services
245+
_, err := cortex.initResourceMonitor()
246+
require.ErrorContains(t, err, "unknown resource type")
247+
}

pkg/util/limiter/resource_based_limiter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resou
3030
promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
3131
Name: "cortex_resource_based_limiter_limit",
3232
Help: "Limit set for the resource utilization.",
33-
ConstLabels: map[string]string{"component": component},
33+
ConstLabels: map[string]string{"component": component, "resource": string(resType)},
3434
}).Set(limit)
3535
default:
3636
return nil, fmt.Errorf("unsupported resource type: [%s]", resType)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package limiter
2+
3+
import (
4+
"testing"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/pkg/util/resource"
10+
)
11+
12+
func Test_ResourceBasedLimiter(t *testing.T) {
13+
limits := map[resource.Type]float64{
14+
resource.CPU: 0.5,
15+
resource.Heap: 0.5,
16+
}
17+
18+
_, err := NewResourceBasedLimiter(&mockMonitor{}, limits, prometheus.DefaultRegisterer, "ingester")
19+
require.NoError(t, err)
20+
}
21+
22+
type mockMonitor struct{}
23+
24+
func (m *mockMonitor) GetCPUUtilization() float64 {
25+
return 0
26+
}
27+
28+
func (m *mockMonitor) GetHeapUtilization() float64 {
29+
return 0
30+
}

pkg/util/resource/monitor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func NewMonitor(limits map[Type]float64, registerer prometheus.Registerer) (*Mon
5050
m := &Monitor{
5151
containerLimit: limits,
5252
scanners: make(map[Type]scanner),
53+
utilization: make(map[Type]float64),
5354

5455
cpuRates: [dataPointsToAvg]float64{},
5556
cpuIntervals: [dataPointsToAvg]float64{},

pkg/util/resource/monitor_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package resource
2+
3+
import (
4+
"testing"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func Test_Monitor(t *testing.T) {
11+
m, err := NewMonitor(map[Type]float64{}, prometheus.DefaultRegisterer)
12+
13+
m.scanners[CPU] = &noopScanner{}
14+
m.containerLimit[CPU] = 1
15+
m.utilization[CPU] = 0.5
16+
17+
require.NoError(t, err)
18+
}

pkg/util/resource/scanner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
const (
9-
heapMetricName = "/memory/classes/Heap/objects:bytes"
9+
heapMetricName = "/memory/classes/heap/objects:bytes"
1010
)
1111

1212
type scanner interface {
@@ -39,5 +39,5 @@ func newHeapScanner() (scanner, error) {
3939

4040
func (s *heapScanner) scan() (float64, error) {
4141
metrics.Read(s.metricSamples)
42-
return s.metricSamples[0].Value.Float64(), nil
42+
return float64(s.metricSamples[0].Value.Uint64()), nil
4343
}

pkg/util/resource/scanner_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,8 @@ func Test_NoopScanner(t *testing.T) {
1212
require.NoError(t, err)
1313
require.Zero(t, val)
1414
}
15+
16+
func Test_HeapScanner(t *testing.T) {
17+
_, err := newHeapScanner()
18+
require.NoError(t, err)
19+
}

0 commit comments

Comments
 (0)