Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Blooms retention #12258

Merged
merged 17 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2729,6 +2729,15 @@ ring:
# and compact as many tables.
# CLI flag: -bloom-compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]

retention:
# Enable bloom retention.
# CLI flag: -bloom-compactor.retention.enabled
[enabled: <boolean> | default = false]

# Max lookback days for retention.
# CLI flag: -bloom-compactor.retention.max-lookback-days
[max_lookback_days: <int> | default = 365]
```

### limits_config
Expand Down
28 changes: 23 additions & 5 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Compactor struct {

tsdbStore TSDBStore
// TODO(owen-d): ShardingStrategy
controller *SimpleBloomController
controller *SimpleBloomController
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
Expand All @@ -65,7 +66,8 @@ func New(
storeCfg storage.Config,
clientMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.Store,
logger log.Logger,
Expand All @@ -75,7 +77,7 @@ func New(
cfg: cfg,
schemaCfg: schemaCfg,
logger: logger,
sharding: sharding,
sharding: util_ring.NewTenantShuffleSharding(ring, ringLifeCycler, limits.BloomCompactorShardSize),
limits: limits,
bloomStore: store,
}
Expand Down Expand Up @@ -104,6 +106,15 @@ func New(
c.logger,
)

c.retentionManager = NewRetentionManager(
c.cfg.RetentionConfig,
c.limits,
c.bloomStore,
newFirstTokenRetentionSharding(ring, ringLifeCycler),
c.metrics,
c.logger,
)

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil
}
Expand Down Expand Up @@ -218,10 +229,17 @@ func (c *Compactor) runOne(ctx context.Context) error {
c.metrics.compactionsStarted.Inc()
start := time.Now()
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var workersErr, retentionErr error
var wg sync.WaitGroup
input := make(chan *tenantTableRange)

// Launch retention (will return instantly if retention is disabled or not owned by this compactor)
wg.Add(1)
go func() {
retentionErr = c.retentionManager.Apply(ctx)
wg.Done()
}()

tables := c.tables(time.Now())
level.Debug(c.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())

Expand All @@ -240,7 +258,7 @@ func (c *Compactor) runOne(ctx context.Context) error {

wg.Wait()
duration := time.Since(start)
err = multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(retentionErr, workersErr, err, ctx.Err()).Err()

if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", duration)
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type mockLimits struct {
shardSize int
}

func (m mockLimits) RetentionPeriod(_ string) time.Duration {
panic("implement me")
}

func (m mockLimits) StreamRetention(_ string) []validation.StreamRetention {
panic("implement me")
}

func (m mockLimits) AllByUserID() map[string]*validation.Limits {
panic("implement me")
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)

Expand All @@ -32,6 +31,8 @@ type Config struct {
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`

RetentionConfig RetentionConfig `yaml:"retention"`
}

// RegisterFlags registers flags for the Bloom-Compactor configuration.
Expand All @@ -53,6 +54,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
cfg.RetentionConfig.RegisterFlags(f)

// Ring
skipFlags := []string{
Expand All @@ -67,6 +69,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if err := cfg.RetentionConfig.Validate(); err != nil {
return err
}

if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
}
Expand All @@ -77,7 +83,7 @@ func (cfg *Config) Validate() error {
}

type Limits interface {
downloads.Limits
RetentionLimits
BloomCompactorShardSize(tenantID string) int
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
40 changes: 40 additions & 0 deletions pkg/bloomcompactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type Metrics struct {

progress prometheus.Gauge
timePerTenant *prometheus.CounterVec

// Retention metrics
retentionRunning prometheus.Gauge
retentionTime *prometheus.HistogramVec
retentionDaysPerIteration *prometheus.HistogramVec
retentionTenantsPerIteration *prometheus.HistogramVec
}

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Expand Down Expand Up @@ -175,6 +181,40 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Name: "tenant_compaction_seconds_total",
Help: "Time spent processing a tenant.",
}, []string{tenantLabel}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_running",
Help: "1 if retention is running in this compactor.",
}),

retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_time_seconds",
Help: "Time this retention process took to complete.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),

retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_days_processed",
Help: "Number of days iterated over during the retention process.",
// 1day -> 5 years, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10),
}, []string{"status"}),

retentionTenantsPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_processed",
Help: "Number of tenants on which retention was applied during the retention process.",
// 1 tenant -> 10k tenants, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
}, []string{"status"}),
}

return &m
Expand Down
Loading
Loading