Skip to content

Commit

Permalink
refactor(blooms): Apply retention in planner (#13484)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 11, 2024
1 parent 15c8b45 commit 2cc901a
Show file tree
Hide file tree
Showing 7 changed files with 1,109 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ bloom_build:
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

retention:
# Enable bloom retention.
# CLI flag: -bloom-build.planner.retention.enabled
[enabled: <boolean> | default = false]

builder:
# The grpc_client block configures the gRPC client used to communicate
# between a client and server component in Loki.
Expand Down
15 changes: 11 additions & 4 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
RetentionConfig RetentionConfig `yaml:"retention"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
Expand All @@ -26,17 +27,23 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f)
}

func (cfg *Config) Validate() error {
if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}

if err := cfg.RetentionConfig.Validate(); err != nil {
return err
}

return nil
}

type Limits interface {
RetentionLimits
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
Expand Down
48 changes: 48 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type Metrics struct {
tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
tenantTasksCompleted *prometheus.GaugeVec

// Retention metrics
retentionRunning prometheus.Gauge
retentionTime *prometheus.HistogramVec
retentionDaysPerIteration *prometheus.HistogramVec
retentionTenantsPerIteration *prometheus.HistogramVec
retentionTenantsExceedingLookback prometheus.Gauge
}

func NewMetrics(
Expand Down Expand Up @@ -161,6 +168,47 @@ func NewMetrics(
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant", "status"}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_running",
Help: "1 if retention is running in this compactor.",
}),

retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_time_seconds",
Help: "Time this retention process took to complete.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),

retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_days_processed",
Help: "Number of days iterated over during the retention process.",
// 1day -> 5 years, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10),
}, []string{"status"}),

retentionTenantsPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_processed",
Help: "Number of tenants on which retention was applied during the retention process.",
// 1 tenant -> 10k tenants, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
}, []string{"status"}),

retentionTenantsExceedingLookback: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_exceeding_lookback",
Help: "Number of tenants with a retention exceeding the configured retention lookback.",
}),
}
}

Expand Down
21 changes: 20 additions & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Planner struct {
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
retentionManager *RetentionManager

cfg Config
limits Limits
Expand Down Expand Up @@ -91,6 +92,14 @@ func New(
logger: logger,
}

p.retentionManager = NewRetentionManager(
p.cfg.RetentionConfig,
p.limits,
p.bloomStore,
p.metrics,
p.logger,
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}
p.subservices, err = services.NewManager(svcs...)
if err != nil {
Expand Down Expand Up @@ -184,6 +193,7 @@ type tenantTable struct {

func (p *Planner) runOne(ctx context.Context) error {
var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
)
Expand All @@ -197,6 +207,16 @@ func (p *Planner) runOne(ctx context.Context) error {
}()

p.metrics.buildStarted.Inc()
level.Info(p.logger).Log("msg", "running bloom build iteration")

// Launch retention (will return instantly if retention is disabled)
wg.Add(1)
go func() {
defer wg.Done()
if err := p.retentionManager.Apply(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed apply retention", "err", err)
}
}()

tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())
Expand Down Expand Up @@ -265,7 +285,6 @@ func (p *Planner) runOne(ctx context.Context) error {
// TODO(salvacorts): This may end up creating too many goroutines.
// Create a pool of workers to process table-tenant tuples.
var tasksSucceed atomic.Int64
var wg sync.WaitGroup
for tt, results := range tasksResultForTenantTable {
if results.tasksToWait == 0 {
// No tasks enqueued for this tenant-table tuple, skip processing
Expand Down
1 change: 1 addition & 0 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

type fakeLimits struct {
Limits
timeout time.Duration
maxRetries int
}
Expand Down
Loading

0 comments on commit 2cc901a

Please sign in to comment.