Skip to content

Commit c4d3bbf

Browse files
monitoring: update apm-server metrics collection to avoid conflicts (#17512) (#17527)
* monitoring: update apm-server metrics collection to avoid conflicts * refactor beats monitoring since there are no more global registries * removed redundant temp slice from apm-server monitoring func (cherry picked from commit f1c279b) Co-authored-by: Isaac Flores <34590010+isaacaflores2@users.noreply.github.com>
1 parent c38fd02 commit c4d3bbf

File tree

2 files changed

+61
-7
lines changed

2 files changed

+61
-7
lines changed

internal/beatcmd/beat.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -547,14 +547,22 @@ func (b *Beat) registerStatsMetrics() {
547547
}
548548
v.OnRegistryStart()
549549
defer v.OnRegistryFinished()
550+
551+
// first collect all apm-server metrics
552+
beatsMetrics := make(map[string]any)
550553
for _, sm := range rm.ScopeMetrics {
551554
switch {
552555
case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
553556
// All simple scalar metrics that begin with the name "apm-server."
554557
// in github.com/elastic/apm-server/... scopes are mapped directly.
555-
addAPMServerMetrics(v, sm)
558+
addAPMServerMetricsToMap(beatsMetrics, sm.Metrics)
556559
}
557560
}
561+
562+
// register all metrics once
563+
// this prevents metrics with the same prefix in the name
564+
// from different scoped meters from overwriting each other
565+
reportOnKey(v, beatsMetrics)
558566
})
559567
}
560568

@@ -577,9 +585,10 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
577585
return 0, false
578586
}
579587

580-
func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
581-
beatsMetrics := make(map[string]any)
582-
for _, m := range sm.Metrics {
588+
// addAPMServerMetricsToMap adds simple scalar metrics with the "apm-server." prefix
589+
// to the map.
590+
func addAPMServerMetricsToMap(beatsMetrics map[string]any, metrics []metricdata.Metrics) {
591+
for _, m := range metrics {
583592
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
584593
if value, ok := getScalarInt64(m.Data); ok {
585594
current := beatsMetrics
@@ -597,8 +606,6 @@ func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
597606
}
598607
}
599608
}
600-
601-
reportOnKey(v, beatsMetrics)
602609
}
603610

604611
func reportOnKey(v monitoring.Visitor, m map[string]any) {

internal/beatcmd/beat_test.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ func TestLibbeatMetrics(t *testing.T) {
268268
}, snapshot)
269269
}
270270

271+
// TestAddAPMServerMetrics tests basic functionality of the metrics collection and reporting
271272
func TestAddAPMServerMetrics(t *testing.T) {
272273
r := monitoring.NewRegistry()
273274
sm := metricdata.ScopeMetrics{
@@ -296,7 +297,12 @@ func TestAddAPMServerMetrics(t *testing.T) {
296297
}
297298

298299
monitoring.NewFunc(r, "apm-server", func(m monitoring.Mode, v monitoring.Visitor) {
299-
addAPMServerMetrics(v, sm)
300+
v.OnRegistryStart()
301+
defer v.OnRegistryFinished()
302+
303+
beatsMetrics := make(map[string]any)
304+
addAPMServerMetricsToMap(beatsMetrics, sm.Metrics)
305+
reportOnKey(v, beatsMetrics)
300306
})
301307

302308
snapshot := monitoring.CollectStructSnapshot(r, monitoring.Full, false)
@@ -305,6 +311,47 @@ func TestAddAPMServerMetrics(t *testing.T) {
305311
"request": int64(1),
306312
"response": int64(1),
307313
},
314+
}, snapshot["apm-server"])
315+
}
316+
317+
func TestMonitoringApmServer(t *testing.T) {
318+
b := newNopBeat(t, "")
319+
b.registerStatsMetrics()
320+
321+
// add metrics similar to lsm_size in storage_manager.go and events.processed in processor.go
322+
meter := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage")
323+
lsmSizeGauge, _ := meter.Int64Gauge("apm-server.sampling.tail.storage.lsm_size")
324+
lsmSizeGauge.Record(context.Background(), 123)
325+
326+
meter2 := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/sampling")
327+
processedCounter, _ := meter2.Int64Counter("apm-server.sampling.tail.events.processed")
328+
processedCounter.Add(context.Background(), 456)
329+
330+
meter3 := b.meterProvider.Meter("github.com/elastic/apm-server/x-pack/apm-server/foo")
331+
otherCounter, _ := meter3.Int64Counter("apm-server.sampling.foo.request")
332+
otherCounter.Add(context.Background(), 1)
333+
334+
// collect metrics
335+
snapshot := monitoring.CollectStructSnapshot(b.Monitoring.StatsRegistry(), monitoring.Full, false)
336+
337+
// assert that the snapshot contains data for all scoped metrics
338+
// with the same metric name prefix 'apm-server.sampling'
339+
assert.Equal(t, map[string]any{
340+
"apm-server": map[string]any{
341+
"sampling": map[string]any{
342+
"foo": map[string]any{
343+
"request": int64(1),
344+
},
345+
"tail": map[string]any{
346+
"storage": map[string]any{
347+
"lsm_size": int64(123),
348+
},
349+
"events": map[string]any{
350+
"processed": int64(456),
351+
},
352+
},
353+
},
354+
},
308355
}, snapshot)
309356
}
310357

0 commit comments

Comments
 (0)