Skip to content

Commit b748748

Browse files
committed
benchmark: add benchmark processing large OTLP messages :dev:
Signed-off-by: Laurent Dufresne <laurent.dufresne@grafana.com>
1 parent 30d13f7 commit b748748

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

pkg/distributor/otel_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"compress/gzip"
88
"context"
99
"encoding/json"
10+
"fmt"
1011
"io"
1112
"net/http"
1213
"net/http/httptest"
14+
"runtime"
1315
"slices"
1416
"strconv"
1517
"strings"
@@ -1140,6 +1142,96 @@ func BenchmarkOTLPHandler(b *testing.B) {
11401142
})
11411143
}
11421144

1145+
func BenchmarkOTLPHandlerWithLargeMessage(b *testing.B) {
1146+
const NumberOfMetrics = 70
1147+
const NumberOfScopedMetrics = 25
1148+
const NumberOfResourceMetrics = 300
1149+
const stepDuration time.Duration = 10 * time.Second
1150+
const defaultDuration time.Duration = 60 * time.Second
1151+
now := time.Date(2020, time.October, 30, 23, 0, 0, 0, time.UTC)
1152+
1153+
createMetrics := func(mts pmetric.MetricSlice, count int, startTime time.Time, duration time.Duration) {
1154+
mts.EnsureCapacity(count)
1155+
for idx := range count {
1156+
mt := mts.AppendEmpty()
1157+
mt.SetName(fmt.Sprintf("metric-%d", idx))
1158+
datapoints := mt.SetEmptyGauge().DataPoints()
1159+
1160+
nDatapoints := int((int64(duration) + (int64(stepDuration) - 1)) / int64(stepDuration))
1161+
datapoints.EnsureCapacity(nDatapoints)
1162+
1163+
sampleTime := startTime
1164+
for range nDatapoints {
1165+
datapoint := datapoints.AppendEmpty()
1166+
datapoint.SetTimestamp(pcommon.NewTimestampFromTime(sampleTime))
1167+
attrs := datapoint.Attributes()
1168+
attrs.PutStr("route", "/hello")
1169+
attrs.PutStr("status", "200")
1170+
sampleTime = sampleTime.Add(stepDuration)
1171+
}
1172+
}
1173+
}
1174+
1175+
createManyScopedMetrics := func(rm pmetric.ResourceMetrics, count int) {
1176+
sms := rm.ScopeMetrics()
1177+
sms.EnsureCapacity(count)
1178+
for idx := range count {
1179+
scopeName := fmt.Sprintf("scope-%d", idx)
1180+
1181+
sm := sms.AppendEmpty()
1182+
scope := sm.Scope()
1183+
scope.SetName(scopeName)
1184+
attrs := scope.Attributes()
1185+
attrs.PutStr("package", scopeName)
1186+
1187+
metrics := sm.Metrics()
1188+
createMetrics(metrics, NumberOfMetrics, now, defaultDuration)
1189+
}
1190+
}
1191+
1192+
createResourceMetrics := func(md pmetric.Metrics, count int) {
1193+
rms := md.ResourceMetrics()
1194+
rms.EnsureCapacity(count)
1195+
for idx := range count {
1196+
rm := rms.AppendEmpty()
1197+
attrs := rm.Resource().Attributes()
1198+
attrs.PutStr("env", "dev")
1199+
attrs.PutStr("region", "us-east-1")
1200+
attrs.PutStr("pod", fmt.Sprintf("pod-%d", idx))
1201+
createManyScopedMetrics(rm, NumberOfScopedMetrics)
1202+
}
1203+
}
1204+
1205+
pushFunc := func(_ context.Context, pushReq *Request) error {
1206+
if _, err := pushReq.WriteRequest(); err != nil {
1207+
return err
1208+
}
1209+
1210+
pushReq.CleanUp()
1211+
return nil
1212+
}
1213+
limits := validation.MockDefaultOverrides()
1214+
handler := OTLPHandler(
1215+
200000000, nil, nil, limits, nil, nil,
1216+
RetryConfig{}, nil, pushFunc, nil, nil, log.NewNopLogger(),
1217+
)
1218+
1219+
b.Run("protobuf", func(b *testing.B) {
1220+
md := pmetric.NewMetrics()
1221+
createResourceMetrics(md, NumberOfResourceMetrics)
1222+
exportReq := pmetricotlp.NewExportRequestFromMetrics(md)
1223+
req := createOTLPProtoRequest(b, exportReq, "")
1224+
1225+
b.ResetTimer()
1226+
for i := 0; i < b.N; i++ {
1227+
resp := httptest.NewRecorder()
1228+
handler.ServeHTTP(resp, req)
1229+
require.Equal(b, http.StatusOK, resp.Code)
1230+
req.Body.(*reusableReader).Reset()
1231+
}
1232+
})
1233+
}
1234+
11431235
func createOTLPProtoRequest(tb testing.TB, metricRequest pmetricotlp.ExportRequest, compression string) *http.Request {
11441236
tb.Helper()
11451237

0 commit comments

Comments
 (0)