Skip to content

Commit

Permalink
Adds a counter for total flushed chunks per reason. (#1819)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Mar 18, 2020
1 parent ba53328 commit d5f27ee
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
34 changes: 27 additions & 7 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ var (
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
})
chunksFlushedPerReason = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"})
)

const (
Expand All @@ -85,6 +90,12 @@ const (

nameLabel = "__name__"
logsValue = "logs"

flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
)

// Flush triggers a flush of all the chunks and closes the flush queues.
Expand Down Expand Up @@ -147,7 +158,8 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

lastChunk := stream.chunks[len(stream.chunks)-1]
if len(stream.chunks) == 1 && !immediate && !i.shouldFlushChunk(&lastChunk) {
shouldFlush, _ := i.shouldFlushChunk(&lastChunk)
if len(stream.chunks) == 1 && !immediate && !shouldFlush {
return
}

Expand Down Expand Up @@ -226,35 +238,43 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint

var result []*chunkDesc
for j := range stream.chunks {
if immediate || i.shouldFlushChunk(&stream.chunks[j]) {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if immediate || shouldFlush {
// Ensure no more writes happen to this chunk.
if !stream.chunks[j].closed {
stream.chunks[j].closed = true
}
// Flush this chunk if it hasn't already been successfully flushed.
if stream.chunks[j].flushed.IsZero() {
result = append(result, &stream.chunks[j])
if immediate {
reason = flushReasonForced
}
chunksFlushedPerReason.WithLabelValues(reason).Add(1)
}
}
}
return result, stream.labels
}

func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) {
// Append should close the chunk when the a new one is added.
if chunk.closed {
return true
if chunk.synced {
return true, flushReasonSynced
}
return true, flushReasonFull
}

if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {
return true
return true, flushReasonIdle
}

if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge {
return true
return true, flushReasonMaxAge
}

return false
return false, ""
}

func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type stream struct {
type chunkDesc struct {
chunk chunkenc.Chunk
closed bool
synced bool
flushed time.Time

lastUpdated time.Time
Expand Down Expand Up @@ -138,7 +139,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
}

chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) {
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, synchronizePeriod, minUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
err := chunk.chunk.Close()
if err != nil {
Expand Down Expand Up @@ -226,7 +227,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe

// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters
// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries.
func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, chunk chunkenc.Chunk, synchronizePeriod time.Duration, minUtilization float64) bool {
func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool {
if synchronizePeriod <= 0 || prevEntryTimestamp.IsZero() {
return false
}
Expand All @@ -239,10 +240,12 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t
// if current entry timestamp has rolled over synchronization period
if cts < pts {
if minUtilization <= 0 {
c.synced = true
return true
}

if chunk.Utilization() > minUtilization {
if c.chunk.Utilization() > minUtilization {
c.synced = true
return true
}
}
Expand Down

0 comments on commit d5f27ee

Please sign in to comment.