From d5f27ee04dcdf6db9c6e8785d9a5a6c6d0a38244 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 18 Mar 2020 13:21:43 -0400 Subject: [PATCH] Adds a counter for total flushed chunks per reason. (#1819) Signed-off-by: Cyril Tovena --- pkg/ingester/flush.go | 34 +++++++++++++++++++++++++++------- pkg/ingester/stream.go | 9 ++++++--- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index cc06bfb2632d5..d53204fc3ddc3 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -76,6 +76,11 @@ var ( // 10ms to 10s. Buckets: prometheus.ExponentialBuckets(0.01, 4, 6), }) + chunksFlushedPerReason = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "ingester_chunks_flushed_total", + Help: "Total flushed chunks per reason.", + }, []string{"reason"}) ) const ( @@ -85,6 +90,12 @@ const ( nameLabel = "__name__" logsValue = "logs" + + flushReasonIdle = "idle" + flushReasonMaxAge = "max_age" + flushReasonForced = "forced" + flushReasonFull = "full" + flushReasonSynced = "synced" ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -147,7 +158,8 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } lastChunk := stream.chunks[len(stream.chunks)-1] - if len(stream.chunks) == 1 && !immediate && !i.shouldFlushChunk(&lastChunk) { + shouldFlush, _ := i.shouldFlushChunk(&lastChunk) + if len(stream.chunks) == 1 && !immediate && !shouldFlush { return } @@ -226,7 +238,8 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint var result []*chunkDesc for j := range stream.chunks { - if immediate || i.shouldFlushChunk(&stream.chunks[j]) { + shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) + if immediate || shouldFlush { // Ensure no more writes happen to this chunk. if !stream.chunks[j].closed { stream.chunks[j].closed = true @@ -234,27 +247,34 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint // Flush this chunk if it hasn't already been successfully flushed. if stream.chunks[j].flushed.IsZero() { result = append(result, &stream.chunks[j]) + if immediate { + reason = flushReasonForced + } + chunksFlushedPerReason.WithLabelValues(reason).Add(1) } } } return result, stream.labels } -func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { +func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { // Append should close the chunk when the a new one is added. if chunk.closed { - return true + if chunk.synced { + return true, flushReasonSynced + } + return true, flushReasonFull } if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { - return true + return true, flushReasonIdle } if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge { - return true + return true, flushReasonMaxAge } - return false + return false, "" } func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 3b22b195bfdf8..45d00a50a8eb6 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -73,6 +73,7 @@ type stream struct { type chunkDesc struct { chunk chunkenc.Chunk closed bool + synced bool flushed time.Time lastUpdated time.Time @@ -138,7 +139,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe } chunk := &s.chunks[len(s.chunks)-1] - if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) { + if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, synchronizePeriod, minUtilization) { // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed err := chunk.chunk.Close() if err != nil { @@ -226,7 +227,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe // Returns true, if chunk should be cut before adding new entry. This is done to make ingesters // cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries. -func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, chunk chunkenc.Chunk, synchronizePeriod time.Duration, minUtilization float64) bool { +func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool { if synchronizePeriod <= 0 || prevEntryTimestamp.IsZero() { return false } @@ -239,10 +240,12 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t // if current entry timestamp has rolled over synchronization period if cts < pts { if minUtilization <= 0 { + c.synced = true return true } - if chunk.Utilization() > minUtilization { + if c.chunk.Utilization() > minUtilization { + c.synced = true return true } }