Skip to content

Commit e08b4a7

Browse files
authored
feat: Pattern ingesters add a limiter for high eviction rate (#13464)
1 parent 845359d commit e08b4a7

File tree

13 files changed

+210
-49
lines changed

13 files changed

+210
-49
lines changed

docs/sources/shared/configuration.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,17 @@ pattern_ingester:
583583
# first flush check is delayed by a random time up to 0.8x the flush check
584584
# period. Additionally, there is +/- 1% jitter added to the interval.
585585
# CLI flag: -pattern-ingester.flush-check-period
586-
[flush_check_period: <duration> | default = 30s]
586+
[flush_check_period: <duration> | default = 1m]
587+
588+
# The maximum number of detected pattern clusters that can be created by
589+
# streams.
590+
# CLI flag: -pattern-ingester.max-clusters
591+
[max_clusters: <int> | default = 300]
592+
593+
# The maximum eviction ratio of patterns per stream. Once that ratio is
594+
# reached, the stream will throttled pattern detection.
595+
# CLI flag: -pattern-ingester.max-eviction-ratio
596+
[max_eviction_ratio: <float> | default = 0.25]
587597

588598
# Configures the metric aggregation and storage behavior of the pattern
589599
# ingester.

pkg/pattern/chunk/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
const (
1010
TimeResolution = model.Time(int64(time.Second*10) / 1e6)
11-
MaxChunkTime = 1 * time.Hour
11+
MaxChunkTime = 15 * time.Minute
1212
)
1313

1414
func TruncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step }

pkg/pattern/drain/drain.go

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ import (
3636
)
3737

3838
type Config struct {
39-
maxNodeDepth int
40-
LogClusterDepth int
41-
SimTh float64
42-
MaxChildren int
43-
ExtraDelimiters []string
44-
MaxClusters int
45-
ParamString string
39+
maxNodeDepth int
40+
LogClusterDepth int
41+
SimTh float64
42+
MaxChildren int
43+
ExtraDelimiters []string
44+
MaxClusters int
45+
ParamString string
46+
MaxEvictionRatio float64
4647
}
4748

4849
func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
@@ -60,29 +61,13 @@ type LogClusterCache struct {
6061
}
6162

6263
func (c *LogClusterCache) Values() []*LogCluster {
63-
values := make([]*LogCluster, 0)
64-
for _, key := range c.cache.Keys() {
65-
if value, ok := c.cache.Peek(key); ok {
66-
values = append(values, value)
67-
}
68-
}
69-
return values
64+
return c.cache.Values()
7065
}
7166

7267
func (c *LogClusterCache) Set(key int, cluster *LogCluster) {
7368
c.cache.Add(key, cluster)
7469
}
7570

76-
func (c *LogClusterCache) Iterate(fn func(*LogCluster) bool) {
77-
for _, key := range c.cache.Keys() {
78-
if value, ok := c.cache.Peek(key); ok {
79-
if !fn(value) {
80-
return
81-
}
82-
}
83-
}
84-
}
85-
8671
func (c *LogClusterCache) Get(key int) *LogCluster {
8772
cluster, ok := c.cache.Get(key)
8873
if !ok {
@@ -140,10 +125,11 @@ func DefaultConfig() *Config {
140125
// Both SimTh and MaxClusterDepth impact branching factor: the greater
141126
// MaxClusterDepth and SimTh, the less the chance that there will be
142127
// "similar" clusters, but the greater the footprint.
143-
SimTh: 0.3,
144-
MaxChildren: 15,
145-
ParamString: `<_>`,
146-
MaxClusters: 300,
128+
SimTh: 0.3,
129+
MaxChildren: 15,
130+
ParamString: `<_>`,
131+
MaxClusters: 300,
132+
MaxEvictionRatio: 0.25,
147133
}
148134
}
149135

@@ -152,10 +138,17 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
152138
panic("depth argument must be at least 3")
153139
}
154140
config.maxNodeDepth = config.LogClusterDepth - 2
155-
var evictFn func(int, *LogCluster)
156-
if metrics != nil {
157-
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
141+
142+
d := &Drain{
143+
config: config,
144+
rootNode: createNode(),
145+
metrics: metrics,
146+
maxAllowedLineLength: 3000,
147+
format: format,
158148
}
149+
150+
limiter := newLimiter(config.MaxEvictionRatio)
151+
159152
var tokenizer LineTokenizer
160153
switch format {
161154
case FormatJSON:
@@ -165,16 +158,20 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
165158
default:
166159
tokenizer = newPunctuationTokenizer()
167160
}
168-
169-
d := &Drain{
170-
config: config,
171-
rootNode: createNode(),
172-
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
173-
metrics: metrics,
174-
tokenizer: tokenizer,
175-
maxAllowedLineLength: 3000,
176-
format: format,
177-
}
161+
d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) {
162+
if metrics != nil {
163+
if d.pruning {
164+
metrics.PatternsPrunedTotal.Inc()
165+
} else {
166+
metrics.PatternsEvictedTotal.Inc()
167+
}
168+
}
169+
if !d.pruning {
170+
limiter.Evict()
171+
}
172+
})
173+
d.tokenizer = tokenizer
174+
d.limiter = limiter
178175
return d
179176
}
180177

@@ -189,6 +186,8 @@ type Drain struct {
189186
format string
190187
tokens []string
191188
state interface{}
189+
limiter *limiter
190+
pruning bool
192191
}
193192

194193
func (d *Drain) Clusters() []*LogCluster {
@@ -200,6 +199,9 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
200199
}
201200

202201
func (d *Drain) Train(content string, ts int64) *LogCluster {
202+
if !d.limiter.Allow() {
203+
return nil
204+
}
203205
if len(content) > d.maxAllowedLineLength {
204206
return nil
205207
}
@@ -325,7 +327,9 @@ func (d *Drain) pruneTree(node *Node) int {
325327
}
326328

327329
func (d *Drain) Delete(cluster *LogCluster) {
330+
d.pruning = true
328331
d.idToCluster.cache.Remove(cluster.id)
332+
d.pruning = false
329333
}
330334

331335
func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {

pkg/pattern/drain/limiter.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package drain
2+
3+
import (
4+
"time"
5+
)
6+
7+
type limiter struct {
8+
added int64
9+
evicted int64
10+
maxPercentage float64
11+
blockedUntil time.Time
12+
}
13+
14+
func newLimiter(maxPercentage float64) *limiter {
15+
return &limiter{
16+
maxPercentage: maxPercentage,
17+
}
18+
}
19+
20+
func (l *limiter) Allow() bool {
21+
if !l.blockedUntil.IsZero() {
22+
if time.Now().Before(l.blockedUntil) {
23+
return false
24+
}
25+
l.reset()
26+
}
27+
if l.added == 0 {
28+
l.added++
29+
return true
30+
}
31+
if float64(l.evicted)/float64(l.added) > l.maxPercentage {
32+
l.block()
33+
return false
34+
}
35+
l.added++
36+
return true
37+
}
38+
39+
func (l *limiter) Evict() {
40+
l.evicted++
41+
}
42+
43+
func (l *limiter) reset() {
44+
l.added = 0
45+
l.evicted = 0
46+
l.blockedUntil = time.Time{}
47+
}
48+
49+
func (l *limiter) block() {
50+
l.blockedUntil = time.Now().Add(10 * time.Minute)
51+
}

pkg/pattern/drain/limiter_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package drain
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestNewLimiter(t *testing.T) {
11+
maxPercentage := 0.5
12+
l := newLimiter(maxPercentage)
13+
require.NotNil(t, l, "expected non-nil limiter")
14+
require.Equal(t, maxPercentage, l.maxPercentage, "expected maxPercentage to match")
15+
require.Equal(t, int64(0), l.added, "expected added to be 0")
16+
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
17+
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
18+
}
19+
20+
func TestLimiterAllow(t *testing.T) {
21+
maxPercentage := 0.5
22+
l := newLimiter(maxPercentage)
23+
24+
// Test allowing when no evictions
25+
require.True(t, l.Allow(), "expected Allow to return true initially")
26+
27+
// Test allowing until evictions exceed maxPercentage
28+
for i := 0; i < 2; i++ {
29+
require.True(t, l.Allow(), "expected Allow to return true %d", i)
30+
l.Evict()
31+
}
32+
33+
// Evict to exceed maxPercentage
34+
l.Evict()
35+
require.False(t, l.Allow(), "expected Allow to return false after evictions exceed maxPercentage")
36+
37+
// Test blocking time
38+
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")
39+
40+
// Fast forward time to simulate block duration passing
41+
l.blockedUntil = time.Now().Add(-1 * time.Minute)
42+
require.True(t, l.Allow(), "expected Allow to return true after block duration")
43+
}
44+
45+
func TestLimiterEvict(t *testing.T) {
46+
l := newLimiter(0.5)
47+
l.Evict()
48+
require.Equal(t, int64(1), l.evicted, "expected evicted to be 1")
49+
l.Evict()
50+
require.Equal(t, int64(2), l.evicted, "expected evicted to be 2")
51+
}
52+
53+
func TestLimiterReset(t *testing.T) {
54+
l := newLimiter(0.5)
55+
l.added = 10
56+
l.evicted = 5
57+
l.blockedUntil = time.Now().Add(10 * time.Minute)
58+
l.reset()
59+
require.Equal(t, int64(0), l.added, "expected added to be 0")
60+
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
61+
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
62+
}
63+
64+
func TestLimiterBlock(t *testing.T) {
65+
l := newLimiter(0.5)
66+
l.block()
67+
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")
68+
require.False(t, l.Allow())
69+
require.True(t, l.blockedUntil.After(time.Now()), "expected blockedUntil to be in the future")
70+
}

pkg/pattern/drain/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func DetectLogFormat(line string) string {
2929

3030
type Metrics struct {
3131
PatternsEvictedTotal prometheus.Counter
32+
PatternsPrunedTotal prometheus.Counter
3233
PatternsDetectedTotal prometheus.Counter
3334
TokensPerLine prometheus.Observer
3435
StatePerLine prometheus.Observer

pkg/pattern/ingester.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/grafana/loki/v3/pkg/logproto"
2424
"github.com/grafana/loki/v3/pkg/logql/syntax"
2525
"github.com/grafana/loki/v3/pkg/pattern/clientpool"
26+
"github.com/grafana/loki/v3/pkg/pattern/drain"
2627
"github.com/grafana/loki/v3/pkg/pattern/metric"
2728
"github.com/grafana/loki/v3/pkg/util"
2829
util_log "github.com/grafana/loki/v3/pkg/util/log"
@@ -39,6 +40,8 @@ type Config struct {
3940
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
4041
ConcurrentFlushes int `yaml:"concurrent_flushes"`
4142
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
43+
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
44+
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`
4245

4346
MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
4447
// For testing.
@@ -53,7 +56,9 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
5356

5457
fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
5558
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
56-
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
59+
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
60+
fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.")
61+
fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.")
5762
}
5863

5964
func (cfg *Config) Validate() error {
@@ -85,6 +90,7 @@ type Ingester struct {
8590

8691
metrics *ingesterMetrics
8792
chunkMetrics *metric.ChunkMetrics
93+
drainCfg *drain.Config
8894
}
8995

9096
func New(
@@ -97,6 +103,10 @@ func New(
97103
chunkMetrics := metric.NewChunkMetrics(registerer, metricsNamespace)
98104
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)
99105

106+
drainCfg := drain.DefaultConfig()
107+
drainCfg.MaxClusters = cfg.MaxClusters
108+
drainCfg.MaxEvictionRatio = cfg.MaxEvictionRatio
109+
100110
i := &Ingester{
101111
cfg: cfg,
102112
logger: log.With(logger, "component", "pattern-ingester"),
@@ -106,6 +116,7 @@ func New(
106116
instances: make(map[string]*instance),
107117
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
108118
loopQuit: make(chan struct{}),
119+
drainCfg: drainCfg,
109120
}
110121
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
111122
var err error
@@ -357,6 +368,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
357368
i.logger,
358369
i.metrics,
359370
i.chunkMetrics,
371+
i.drainCfg,
360372
i.cfg.MetricAggregation,
361373
)
362374
if err != nil {

pkg/pattern/ingester_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/grafana/loki/v3/pkg/logproto"
1616
"github.com/grafana/loki/v3/pkg/logql/syntax"
17+
"github.com/grafana/loki/v3/pkg/pattern/drain"
1718
"github.com/grafana/loki/v3/pkg/pattern/iter"
1819
"github.com/grafana/loki/v3/pkg/pattern/metric"
1920

@@ -28,6 +29,7 @@ func setup(t *testing.T) *instance {
2829
log.NewNopLogger(),
2930
newIngesterMetrics(nil, "test"),
3031
metric.NewChunkMetrics(nil, "test"),
32+
drain.DefaultConfig(),
3133
metric.AggregationConfig{
3234
Enabled: true,
3335
},

0 commit comments

Comments
 (0)