Skip to content

Commit

Permalink
added logic to exclude detected_level label while calculating struc…
Browse files Browse the repository at this point in the history
…uture metadata size because it's injected on distributors' size(for most of the cases).

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko committed Oct 24, 2024
1 parent 5a10e5d commit daf77c8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
9 changes: 2 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ var (

func TestDistributor(t *testing.T) {
lineSize := 10
// detected_level label will be added to all log entries
strucutredMetadataSize := len(constants.LevelLabel) + len(constants.LogLevelUnknown)
entryTotalSize := lineSize + strucutredMetadataSize
ingestionRateLimit := datasize.ByteSize(400)
ingestionRateLimitMB := ingestionRateLimit.MBytes() // 400 Bytes/s limit

Expand All @@ -78,7 +75,7 @@ func TestDistributor(t *testing.T) {
{
lines: 100,
streams: 1,
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", ingestionRateLimit, 100, 100*entryTotalSize)},
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", ingestionRateLimit, 100, 100*lineSize)},
},
{
lines: 100,
Expand Down Expand Up @@ -1229,9 +1226,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {

distributors, _ := prepare(t, testData.distributors, 5, limits, nil)
for _, push := range testData.pushes {
// each log entry will be added structured metadata label `detected_level:"unknown"` that adds additional 21 bytes on distributor side.
structuredMetadataSize := 21
request := makeWriteRequest(1, push.bytes-structuredMetadataSize)
request := makeWriteRequest(1, push.bytes)
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
stats.StreamLabelsSize += int64(labelsSize(logproto.FromLabelsToLabelAdapters(lbs)))
}

resourceAttributesAsStructuredMetadataSize := labelsSize(resourceAttributesAsStructuredMetadata)
resourceAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(resourceAttributesAsStructuredMetadata)
retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs)

stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
Expand Down Expand Up @@ -250,7 +250,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
})
}

scopeAttributesAsStructuredMetadataSize := labelsSize(scopeAttributesAsStructuredMetadata)
scopeAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(scopeAttributesAsStructuredMetadata)
stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
if tracker != nil {
tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(scopeAttributesAsStructuredMetadataSize))
Expand All @@ -276,7 +276,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
stream.Entries = append(stream.Entries, entry)
pushRequestsByStream[labelsStr] = stream

metadataSize := int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[retentionPeriodForUser] += metadataSize
stats.LogLinesBytes[retentionPeriodForUser] += int64(len(entry.Line))

Expand Down
5 changes: 1 addition & 4 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,7 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
}
for _, e := range s.Entries {
pushStats.NumLines++
var entryLabelsSize int64
for _, l := range e.StructuredMetadata {
entryLabelsSize += int64(len(l.Name) + len(l.Value))
}
entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata))
pushStats.LogLinesBytes[retentionPeriod] += int64(len(e.Line))
pushStats.StructuredMetadataBytes[retentionPeriod] += entryLabelsSize

Expand Down
13 changes: 12 additions & 1 deletion pkg/util/entry_size.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package util

import "github.com/grafana/loki/pkg/push"
import (
"golang.org/x/exp/slices"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/util/constants"
)

func EntriesTotalSize(entries []push.Entry) int {
size := 0
Expand All @@ -14,9 +20,14 @@ func EntryTotalSize(entry *push.Entry) int {
return len(entry.Line) + StructuredMetadataSize(entry.StructuredMetadata)
}

var excludedStructuredMetadataLabels = []string{constants.LevelLabel}

func StructuredMetadataSize(metas push.LabelsAdapter) int {
size := 0
for _, meta := range metas {
if slices.Contains(excludedStructuredMetadataLabels, meta.Name) {
continue
}
size += len(meta.Name) + len(meta.Value)
}
return size
Expand Down

0 comments on commit daf77c8

Please sign in to comment.