Skip to content

Commit

Permalink
Move ingester metrics around (#6275)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine authored Jun 1, 2022
1 parent f58a14f commit 7f50a26
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 140 deletions.
113 changes: 18 additions & 95 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
Expand All @@ -19,87 +18,11 @@ import (

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
loki_util "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_entries",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB)
})
chunkCompressionRatio = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(.75, 2, 10),
})
chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"})
chunkSizePerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"})
chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
})
chunkEncodeTime = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_encode_time_seconds",
Help: "Distribution of chunk encode times.",
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
})
chunksFlushedPerReason = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"})
chunkLifespan = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_bounds_hours",
Help: "Distribution of chunk end-start durations.",
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
})
flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks")
flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes")
flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines")
flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds")
flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds")
flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization")
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
Expand Down Expand Up @@ -324,7 +247,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected
stream.chunks = stream.chunks[1:]
}
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))
i.metrics.memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

// Signal how much data has been flushed to lessen any WAL replay pressure.
i.replayController.Sub(int64(subtracted))
Expand Down Expand Up @@ -360,8 +283,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

sizePerTenant := chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := chunksPerTenant.WithLabelValues(userID)
sizePerTenant := i.metrics.chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := i.metrics.chunksPerTenant.WithLabelValues(userID)

for j, c := range cs {
if err := i.closeChunk(c, chunkMtx); err != nil {
Expand Down Expand Up @@ -430,7 +353,7 @@ func (i *Ingester) encodeChunk(ctx context.Context, ch *chunk.Chunk, desc *chunk
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil {
return fmt.Errorf("chunk encoding: %w", err)
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds())
return nil
}

Expand All @@ -443,7 +366,7 @@ func (i *Ingester) flushChunk(ctx context.Context, ch *chunk.Chunk) error {
if err := i.store.Put(ctx, []chunk.Chunk{*ch}); err != nil {
return fmt.Errorf("store put chunk: %w", err)
}
flushedChunksStats.Inc(1)
i.metrics.flushedChunksStats.Inc(1)
return nil
}

Expand All @@ -455,30 +378,30 @@ func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc
return
}

chunksFlushedPerReason.WithLabelValues(reason).Add(1)
i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
chunkUtilization.Observe(utilization)
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(compressedSize)
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
chunkAge.Observe(time.Since(boundsFrom).Seconds())
chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

flushedChunksBytesStats.Record(compressedSize)
flushedChunksLinesStats.Record(float64(numEntries))
flushedChunksUtilizationStats.Record(utilization)
flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Hours())
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Hours())
}
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo

err := s.consumeChunk(ctx, chunk)
if err == nil {
memoryChunks.Inc()
i.metrics.memoryChunks.Inc()
}

return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil)
i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand Down
119 changes: 119 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -35,6 +36,29 @@ type ingesterMetrics struct {
limiterEnabled prometheus.Gauge

autoForgetUnhealthyIngestersTotal prometheus.Counter

chunkUtilization prometheus.Histogram
memoryChunks prometheus.Gauge
chunkEntries prometheus.Histogram
chunkSize prometheus.Histogram
chunkCompressionRatio prometheus.Histogram
chunksPerTenant *prometheus.CounterVec
chunkSizePerTenant *prometheus.CounterVec
chunkAge prometheus.Histogram
chunkEncodeTime prometheus.Histogram
chunksFlushedPerReason *prometheus.CounterVec
chunkLifespan prometheus.Histogram
flushedChunksStats *usagestats.Counter
flushedChunksBytesStats *usagestats.Statistics
flushedChunksLinesStats *usagestats.Statistics
flushedChunksAgeStats *usagestats.Statistics
flushedChunksLifespanStats *usagestats.Statistics
flushedChunksUtilizationStats *usagestats.Statistics

chunksCreatedTotal prometheus.Counter
samplesPerChunk prometheus.Histogram
blocksPerChunk prometheus.Histogram
chunkCreatedStats *usagestats.Counter
}

// setRecoveryBytesInUse bounds the bytes reports to >= 0.
Expand Down Expand Up @@ -148,5 +172,100 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",
}),
chunkUtilization: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
}),
memoryChunks: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ingester_memory_chunks",
Help: "The total number of chunks in memory.",
}),
chunkEntries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_entries",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB)
}),
chunkCompressionRatio: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(.75, 2, 10),
}),
chunksPerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"}),
chunkSizePerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"}),
chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
}),
chunkEncodeTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_encode_time_seconds",
Help: "Distribution of chunk encode times.",
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
}),
chunksFlushedPerReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"}),
chunkLifespan: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "ingester_chunk_bounds_hours",
Help: "Distribution of chunk end-start durations.",
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
}),
flushedChunksStats: usagestats.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: usagestats.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: usagestats.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: usagestats.NewStatistics("ingester_flushed_chunks_age_seconds"),
flushedChunksLifespanStats: usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds"),
flushedChunksUtilizationStats: usagestats.NewStatistics("ingester_flushed_chunks_utilization"),
chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
}),
samplesPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
Name: "samples_per_chunk",
Help: "The number of samples in a chunk.",

Buckets: prometheus.LinearBuckets(4096, 2048, 6),
}),
blocksPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
Name: "blocks_per_chunk",
Help: "The number of blocks in a chunk.",

Buckets: prometheus.ExponentialBuckets(5, 2, 6),
}),

chunkCreatedStats: usagestats.NewCounter("ingester_chunk_created"),
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ingesterRecoverer) Series(series *Series) error {
if err != nil {
return err
}
memoryChunks.Add(float64(len(series.Chunks)))
r.ing.metrics.memoryChunks.Add(float64(len(series.Chunks)))
r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks)))
r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded))
r.ing.replayController.Add(int64(bytesAdded))
Expand Down
Loading

0 comments on commit 7f50a26

Please sign in to comment.