Skip to content

Commit

Permalink
fix(aggregated-metrics): correctly create logfmt string (#14124)
Browse files Browse the repository at this point in the history
  • Loading branch information
svennergr authored Sep 12, 2024
1 parent 53cfef3 commit 63e84b4
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
17 changes: 6 additions & 11 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/c2h5oh/datasize"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -156,9 +155,9 @@ func RecordRangeAndInstantQueryMetrics(
"status", status,
"limit", p.Limit(),
"returned_lines", returnedLines,
"throughput", humanizeBytes(uint64(stats.Summary.BytesProcessedPerSecond)),
"total_bytes", humanizeBytes(uint64(stats.Summary.TotalBytesProcessed)),
"total_bytes_structured_metadata", humanizeBytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)),
"throughput", util.HumanizeBytes(uint64(stats.Summary.BytesProcessedPerSecond)),
"total_bytes", util.HumanizeBytes(uint64(stats.Summary.TotalBytesProcessed)),
"total_bytes_structured_metadata", util.HumanizeBytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)),
"lines_per_second", stats.Summary.LinesProcessedPerSecond,
"total_lines", stats.Summary.TotalLinesProcessed,
"post_filter_lines", stats.Summary.TotalPostFilterLines,
Expand Down Expand Up @@ -197,11 +196,11 @@ func RecordRangeAndInstantQueryMetrics(
// Total ingester reached for this query.
"ingester_requests", stats.Ingester.GetTotalReached(),
// Total bytes processed but was already in memory (found in the headchunk). Includes structured metadata bytes.
"ingester_chunk_head_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetHeadChunkBytes())),
"ingester_chunk_head_bytes", util.HumanizeBytes(uint64(stats.Ingester.Store.Chunk.GetHeadChunkBytes())),
// Total bytes of compressed chunks (blocks) processed.
"ingester_chunk_compressed_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetCompressedBytes())),
"ingester_chunk_compressed_bytes", util.HumanizeBytes(uint64(stats.Ingester.Store.Chunk.GetCompressedBytes())),
// Total bytes decompressed and processed from chunks. Includes structured metadata bytes.
"ingester_chunk_decompressed_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetDecompressedBytes())),
"ingester_chunk_decompressed_bytes", util.HumanizeBytes(uint64(stats.Ingester.Store.Chunk.GetDecompressedBytes())),
// Total lines post filtering.
"ingester_post_filter_lines", stats.Ingester.Store.Chunk.GetPostFilterLines(),
// Time spent being blocked on congestion control.
Expand Down Expand Up @@ -243,10 +242,6 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}

func humanizeBytes(val uint64) string {
return strings.Replace(humanize.Bytes(val), " ", "", 1)
}

func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
Expand Down
8 changes: 4 additions & 4 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/build"

"github.com/grafana/dskit/backoff"
Expand Down Expand Up @@ -312,17 +312,17 @@ func AggregatedMetricEntry(
service string,
lbls labels.Labels,
) string {
byteString := humanize.Bytes(totalBytes)
byteString := util.HumanizeBytes(totalBytes)
base := fmt.Sprintf(
"ts=%d bytes=%s count=%d %s=%s",
"ts=%d bytes=%s count=%d %s=\"%s\"",
ts.UnixNano(),
byteString,
totalCount,
push.LabelServiceName, service,
)

for _, l := range lbls {
base += fmt.Sprintf(" %s=%s", l.Name, l.Value)
base += fmt.Sprintf(" %s=\"%s\"", l.Name, l.Value)
}

return base
Expand Down
3 changes: 3 additions & 0 deletions pkg/pattern/aggregation/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ func Test_Push(t *testing.T) {
stream2.Entries[2].Line,
)

// sanity check that bytes are logged in humanized form without whitespaces
assert.Contains(t, stream1.Entries[0].Line, "bytes=1B")

case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"fmt"
"math"
"strings"
"sync"

humanize "github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -841,3 +843,8 @@ func RegisterCounterVec(registerer prometheus.Registerer, namespace, name, help
}
return vec
}

// HumanizeBytes returns a human readable string representation of the given byte value and removes all whitespaces.
func HumanizeBytes(val uint64) string {
return strings.Replace(humanize.Bytes(val), " ", "", 1)
}
15 changes: 15 additions & 0 deletions pkg/util/metrics_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,3 +1158,18 @@ func verifyLabels(t *testing.T, m prometheus.Collector, filter map[string]string

require.Equal(t, expectedLabels, result)
}

func TestHumanizeBytes(t *testing.T) {
tests := map[uint64]string{
1024: "1.0kB",
1024 * 1000: "1.0MB",
1024 * 1000 * 1000: "1.0GB",
10: "10B",
}

for bytes, humanizedBytes := range tests {
t.Run(fmt.Sprintf("%d", bytes), func(t *testing.T) {
require.Equal(t, humanizedBytes, HumanizeBytes(bytes))
})
}
}

0 comments on commit 63e84b4

Please sign in to comment.