Skip to content

monitoring: update apm-server metrics collection to avoid conflicts #17512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,22 @@ func (b *Beat) registerStatsMetrics() {
}
v.OnRegistryStart()
defer v.OnRegistryFinished()

// first collect all apm-server metrics
beatsMetrics := make(map[string]any)
for _, sm := range rm.ScopeMetrics {
switch {
case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
// All simple scalar metrics that begin with the name "apm-server."
// in github.com/elastic/apm-server/... scopes are mapped directly.
addAPMServerMetrics(v, sm)
addAPMServerMetricsToMap(beatsMetrics, sm.Metrics)
}
}

// register all metrics once
// this prevents metrics with the same prefix in the name
// from different scoped meters from overwriting each other
reportOnKey(v, beatsMetrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great finding!

})
}

Expand All @@ -577,9 +585,10 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
return 0, false
}

func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
beatsMetrics := make(map[string]any)
for _, m := range sm.Metrics {
// addAPMServerMetricsToMap adds simple scalar metrics with the "apm-server." prefix
// to the map.
func addAPMServerMetricsToMap(beatsMetrics map[string]any, metrics []metricdata.Metrics) {
for _, m := range metrics {
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
if value, ok := getScalarInt64(m.Data); ok {
current := beatsMetrics
Expand All @@ -597,8 +606,6 @@ func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
}
}
}

reportOnKey(v, beatsMetrics)
}

func reportOnKey(v monitoring.Visitor, m map[string]any) {
Expand Down
49 changes: 48 additions & 1 deletion internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func TestLibbeatMetrics(t *testing.T) {
}, snapshot)
}

// TestAddAPMServerMetrics tests basic functionality of the metrics collection and reporting
func TestAddAPMServerMetrics(t *testing.T) {
r := monitoring.NewRegistry()
sm := metricdata.ScopeMetrics{
Expand Down Expand Up @@ -296,7 +297,12 @@ func TestAddAPMServerMetrics(t *testing.T) {
}

monitoring.NewFunc(r, "apm-server", func(m monitoring.Mode, v monitoring.Visitor) {
addAPMServerMetrics(v, sm)
v.OnRegistryStart()
defer v.OnRegistryFinished()

beatsMetrics := make(map[string]any)
addAPMServerMetricsToMap(beatsMetrics, sm.Metrics)
reportOnKey(v, beatsMetrics)
})

snapshot := monitoring.CollectStructSnapshot(r, monitoring.Full, false)
Expand All @@ -305,6 +311,47 @@ func TestAddAPMServerMetrics(t *testing.T) {
"request": int64(1),
"response": int64(1),
},
}, snapshot["apm-server"])
}

func TestMonitoringApmServer(t *testing.T) {
b := newNopBeat(t, "")
b.registerStatsMetrics()

// add metrics similar to lsm_size in storage_manager.go and events.processed in processor.go
meter := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage")
lsmSizeGauge, _ := meter.Int64Gauge("apm-server.sampling.tail.storage.lsm_size")
lsmSizeGauge.Record(context.Background(), 123)

meter2 := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/sampling")
processedCounter, _ := meter2.Int64Counter("apm-server.sampling.tail.events.processed")
processedCounter.Add(context.Background(), 456)

meter3 := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/foo")
otherCounter, _ := meter3.Int64Counter("apm-server.sampling.foo.request")
otherCounter.Add(context.Background(), 1)

// collect metrics
snapshot := monitoring.CollectStructSnapshot(b.Monitoring.StatsRegistry(), monitoring.Full, false)

// assert that the snapshot contains data for all scoped metrics
// with the same metric name prefix 'apm-server.sampling'
assert.Equal(t, map[string]any{
"apm-server": map[string]any{
"sampling": map[string]any{
"foo": map[string]any{
"request": int64(1),
},
"tail": map[string]any{
"storage": map[string]any{
"lsm_size": int64(123),
},
"events": map[string]any{
"processed": int64(456),
},
},
},
},
}, snapshot)
}

Expand Down