Skip to content

Commit

Permalink
retention test
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Mar 19, 2024
1 parent b3de24c commit 78f085f
Show file tree
Hide file tree
Showing 6 changed files with 511 additions and 98 deletions.
3 changes: 1 addition & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ func New(
c.cfg.RetentionConfig,
c.limits,
c.bloomStore,
ring,
ringLifeCycler,
newFirstTokenRetentionSharding(ring, ringLifeCycler),
c.logger,
)

Expand Down
135 changes: 83 additions & 52 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,28 @@ import (
"github.com/grafana/loki/pkg/validation"
)

type RetentionConfig struct {
Enabled bool
MaxRetentionLookbackDays int
}

func (cfg *RetentionConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "bloom-compactor.retention.enabled", false, "Enable bloom retention.")
f.IntVar(&cfg.MaxRetentionLookbackDays, "bloom-compactor.retention.max-lookback-days", 365, "Max lookback days for retention.")
type retentionSharding interface {
OwnsRetention() (bool, error)
}

type RetentionLimits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
}

type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
type firstTokenRetentionSharding struct {
ring ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
logger log.Logger
}

func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
logger log.Logger,
) *RetentionManager {
return &RetentionManager{
cfg: cfg,
limits: limits,
bloomStore: bloomStore,
func newFirstTokenRetentionSharding(ring ring.ReadRing, ringLifeCycler *ring.BasicLifecycler) *firstTokenRetentionSharding {
return &firstTokenRetentionSharding{
ring: ring,
ringLifeCycler: ringLifeCycler,
logger: log.With(logger, "component", "retention-manager"),
}
}

// ownsRetention returns true if the compactor should apply retention.
// OwnsRetention returns true if the compactor should apply retention.
// This is determined by checking if the compactor owns the smaller token in the ring.
// Note that during a ring topology change, more than one compactor may attempt to apply retention.
// This is fine since retention consists on deleting old data which should be idempotent.
func (r *RetentionManager) ownsRetention() (bool, error) {
rs, err := r.ring.GetAllHealthy(RingOp)
func (s *firstTokenRetentionSharding) OwnsRetention() (bool, error) {
rs, err := s.ring.GetAllHealthy(RingOp)
if err != nil {
return false, errors.Wrap(err, "getting ring healthy instances")
}
Expand All @@ -78,31 +52,81 @@ func (r *RetentionManager) ownsRetention() (bool, error) {
return int(smallerA - smallerB)
})

return instance.GetAddr() == r.ringLifeCycler.GetInstanceID(), nil
return instance.GetAddr() == s.ringLifeCycler.GetInstanceID(), nil
}

type RetentionConfig struct {
Enabled bool
MaxRetentionLookbackDays int
}

func (cfg *RetentionConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "bloom-compactor.retention.enabled", false, "Enable bloom retention.")
f.IntVar(&cfg.MaxRetentionLookbackDays, "bloom-compactor.retention.max-lookback-days", 365, "Max lookback days for retention.")
}

type RetentionLimits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
}

type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
sharding retentionSharding
logger log.Logger

// For testing
now func() model.Time
}

func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
sharding retentionSharding,
logger log.Logger,
) *RetentionManager {
return &RetentionManager{
cfg: cfg,
limits: limits,
bloomStore: bloomStore,
sharding: sharding,
logger: log.With(logger, "component", "retention-manager"),
now: model.Now,
}
}

func (r *RetentionManager) Apply(ctx context.Context) error {
ownsRetention, err := r.ownsRetention()
if !r.cfg.Enabled {
return nil
}

ownsRetention, err := r.sharding.OwnsRetention()
if err != nil {
return errors.Wrap(err, "checking if compactor owns retention")
}
if !ownsRetention {
return nil
}

level.Info(r.logger).Log("msg", "Applying retention")

// We iterate through up to r.cfg.MaxRetentionLookbackDays days unless it's set to 0
// In that case, we iterate through all days
for i := 0; i < r.cfg.MaxRetentionLookbackDays || r.cfg.MaxRetentionLookbackDays == 0; i++ {
now := r.now()
for i := 1; i <= r.cfg.MaxRetentionLookbackDays || r.cfg.MaxRetentionLookbackDays == 0; i++ {
dayLogger := log.With(r.logger, "day", i)
day := model.Now().Add(-time.Duration(i) * 24 * time.Hour)
day := now.Add(-time.Duration(i) * 24 * time.Hour)
bloomClient, err := r.bloomStore.Client(day)
if err != nil {
level.Error(dayLogger).Log("msg", "failed to get bloom store client", "err", err)
break
}
objectClient := bloomClient.ObjectClient()

tenants, err := r.bloomStore.UsersForPeriod(ctx, day)
tenants, table, err := r.bloomStore.UsersForPeriod(ctx, day)
if err != nil {
return errors.Wrap(err, "getting users for period")
}
Expand All @@ -113,24 +137,28 @@ func (r *RetentionManager) Apply(ctx context.Context) error {
break
}

for _, tenantDir := range tenants {
tenant, err := bloomClient.ParseTenantKey(tenantDir)
if err != nil {
return errors.Wrap(err, "parsing tenant key")
}

for _, tenant := range tenants {
tenantRetention := findLongestRetention(tenant, r.limits)
tenantLogger := log.With(dayLogger, "tenant", tenant, "retention", tenantRetention)

if !day.Before(model.Now().Add(-tenantRetention)) {
level.Debug(tenantLogger).Log("msg", "tenant retention not expired")
if !day.Before(now.Add(-tenantRetention)) {
continue
}

// Delete all metas and blocks for this tenant in this day by deleting the tenant directory
tenantLogger := log.With(dayLogger, "table", table, "tenant", tenant, "retention", tenantRetention)
level.Info(tenantLogger).Log("msg", "applying retention to tenant")
if err := objectClient.DeleteObject(ctx, tenantDir.Addr()); err != nil {
return errors.Wrapf(err, "deleting tenant directory %s", tenantDir.Addr())

// List all keys under the tenant directory
// Note: we cannot delete the tenant directory directly because it is not an actual key in the object store
// Instead, we need to list all keys under the tenant directory and delete them one by one
tenantDir := bloomClient.Tenant(tenant, table)
tenantObjects, _, err := objectClient.List(ctx, tenantDir.Addr(), "")
if err != nil {
return errors.Wrapf(err, "listing tenant directory %s", tenantDir.Addr())
}

for _, object := range tenantObjects {
if err := objectClient.DeleteObject(ctx, object.Key); err != nil {
return errors.Wrapf(err, "deleting tenant directory %s", tenantDir.Addr())
}
}
}
}
Expand All @@ -143,6 +171,9 @@ func (r *RetentionManager) Apply(ctx context.Context) error {
func findLongestRetention(tenant string, limits RetentionLimits) time.Duration {
globalRetention := limits.RetentionPeriod(tenant)
streamRetention := limits.StreamRetention(tenant)
if len(streamRetention) == 0 {
return globalRetention
}

maxStreamRetention := slices.MaxFunc(streamRetention, func(a, b validation.StreamRetention) int {
return int(a.Period - b.Period)
Expand Down
Loading

0 comments on commit 78f085f

Please sign in to comment.