Skip to content

Commit

Permalink
feat: Collect duplicate log line metrics (#13084)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r authored Jun 26, 2024
1 parent 7e19cc7 commit 40ee766
Show file tree
Hide file tree
Showing 29 changed files with 376 additions and 112 deletions.
8 changes: 8 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3805,6 +3805,14 @@ These are values which allow you to control aspects of Loki's operation, most co
# CLI flag: -operation-config.log-push-request-streams
[log_push_request_streams: <boolean> | default = false]
# Log metrics for duplicate lines received.
# CLI flag: -operation-config.log-duplicate-metrics
[log_duplicate_metrics: <boolean> | default = false]
# Log stream info for duplicate lines received
# CLI flag: -operation-config.log-duplicate-stream-info
[log_duplicate_stream_info: <boolean> | default = false]
# Log push errors with a rate limited logger, will show client push errors
# without overly spamming logs.
# CLI flag: -operation-config.limited-log-push-errors
Expand Down
9 changes: 5 additions & 4 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
return len(c.entries) < tmpNumEntries
}

func (c *dumbChunk) Append(entry *logproto.Entry) error {
// The dumbChunk does not check for duplicates, and will always return false
func (c *dumbChunk) Append(entry *logproto.Entry) (bool, error) {
if len(c.entries) == tmpNumEntries {
return ErrChunkFull
return false, ErrChunkFull
}

if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

c.entries = append(c.entries, *entry)
return nil
return false, nil
}

func (c *dumbChunk) Size() int {
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func SupportedEncoding() string {
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
// Append returns true if the entry appended was a duplicate
Append(*logproto.Entry) (bool, error)
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Expand Down
25 changes: 14 additions & 11 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ func (hb *headBlock) Reset() {

func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt }

func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error {
// The headBlock does not check for duplicates, and will always return false
func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error) {
if !hb.IsEmpty() && hb.maxt > ts {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

hb.entries = append(hb.entries, entry{t: ts, s: line})
Expand All @@ -193,7 +194,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error {
hb.maxt = ts
hb.size += len(line)

return nil
return false, nil
}

func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
Expand Down Expand Up @@ -340,7 +341,7 @@ func (hb *headBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (Head
out := version.NewBlock(symbolizer)

for _, e := range hb.entries {
if err := out.Append(e.t, e.s, e.structuredMetadata); err != nil {
if _, err := out.Append(e.t, e.s, e.structuredMetadata); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -834,27 +835,29 @@ func (c *MemChunk) Utilization() float64 {
}

// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
// The MemChunk may return true or false, depending on what the head block returns.
func (c *MemChunk) Append(entry *logproto.Entry) (bool, error) {
entryTimestamp := entry.Timestamp.UnixNano()

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

if c.format < ChunkFormatV4 {
entry.StructuredMetadata = nil
}
if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil {
return err
dup, err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata))
if err != nil {
return dup, err
}

if c.head.UncompressedSize() >= c.blockSize {
return c.cut()
return false, c.cut()
}

return nil
return dup, nil
}

// Close implements Chunk.
Expand Down Expand Up @@ -1122,7 +1125,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
if filter != nil && filter(entry.Timestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)...) {
continue
}
if err := newChunk.Append(&entry); err != nil {
if _, err := newChunk.Append(&entry); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 40ee766

Please sign in to comment.