Skip to content

Commit

Permalink
Fix bug with repeated tenants
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Mar 20, 2024
1 parent 3103d28 commit 87a973d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 101 deletions.
25 changes: 8 additions & 17 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (r *RetentionManager) Apply(ctx context.Context) error {
}
objectClient := bloomClient.ObjectClient()

tenants, table, err := r.bloomStore.UsersForPeriod(ctx, day.ModelTime())
tenants, err := r.bloomStore.UsersFilesForPeriod(ctx, day.ModelTime())
if err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
Expand All @@ -155,32 +155,23 @@ func (r *RetentionManager) Apply(ctx context.Context) error {
break
}

for _, tenant := range tenants {
for tenant, objectKeys := range tenants {
tenantRetention := findLongestRetention(tenant, r.limits)
expirationDay := storageconfig.NewDayTime(today.Add(-tenantRetention))
if !day.Before(expirationDay) {
continue
}

tenantLogger := log.With(dayLogger, "table", table, "tenant", tenant, "retention", tenantRetention)
tenantLogger := log.With(dayLogger, "tenant", tenant, "retention", tenantRetention)
level.Info(tenantLogger).Log("msg", "applying retention to tenant")

// List all keys under the tenant directory
// Note: we cannot delete the tenant directory directly because it is not an actual key in the object store
// Instead, we need to list all keys under the tenant directory and delete them one by one
tenantDir := bloomClient.Tenant(tenant, table)
tenantObjects, _, err := objectClient.List(ctx, tenantDir.Addr(), "")
if err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
return errors.Wrapf(err, "listing tenant directory %s", tenantDir.Addr())
}

for _, object := range tenantObjects {
if err := objectClient.DeleteObject(ctx, object.Key); err != nil {
// Note: we cannot delete the tenant directory directly because it is not an
// actual key in the object store. Instead, we need to delete all keys one by one.
for _, key := range objectKeys {
if err := objectClient.DeleteObject(ctx, key); err != nil {
r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds())
r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed))
return errors.Wrapf(err, "deleting tenant directory %s", tenantDir.Addr())
return errors.Wrapf(err, "deleting key %s", key)
}
}
}
Expand Down
107 changes: 52 additions & 55 deletions pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomcompactor

import (
"context"
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestRetention(t *testing.T) {
lim mockRetentionLimits
prePopulate func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore)
expectErr bool
check func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore)
check func(t *testing.T, bloomStore *bloomshipper.BloomStore)
}{
{
name: "retention disabled",
Expand All @@ -52,14 +53,14 @@ func TestRetention(t *testing.T) {
putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50)
putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
},
check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) {
metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500)
check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) {
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 200, len(metas[0]))
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 50, len(metas[0]))
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 500, len(metas[0]))
},
Expand All @@ -82,14 +83,14 @@ func TestRetention(t *testing.T) {
putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50)
putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
},
check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) {
metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500)
check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) {
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 200, len(metas[0]))
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 50, len(metas[0]))
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 500, len(metas[0]))
},
Expand Down Expand Up @@ -130,29 +131,29 @@ func TestRetention(t *testing.T) {
putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500)
},
check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) {
check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) {
// Tenant 1 has 40 days of retention, and we wrote 200 days of metas
// We should get two groups: 0th-40th and 101th-200th
metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500)
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500)
require.Equal(t, 2, len(metas))
require.Equal(t, 41, len(metas[0])) // 0-40th day
require.Equal(t, 99, len(metas[1])) // 101th-200th day

// Tenant 2 has 20 days of retention, and we wrote 50 days of metas
// We should get one group: 0th-20th
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 21, len(metas[0])) // 0th-20th

// Tenant 3 has 200 days of retention, and we wrote 500 days of metas
// Since the manager looks up to 100 days, we shouldn't have deleted any metas
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 500, len(metas[0])) // 0th-500th

// Tenant 4 has 400 days of retention, and we wrote 500 days of metas
// Since the manager looks up to 100 days, we shouldn't have deleted any metas
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 500, len(metas[0])) // 0th-500th
},
Expand Down Expand Up @@ -192,28 +193,28 @@ func TestRetention(t *testing.T) {
putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500)
},
check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) {
check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) {
// Tenant 1 has 40 days of retention, and we wrote 200 days of metas
// We should get one groups: 0th-40th
metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500)
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 41, len(metas[0])) // 0-40th day

// Tenant 2 has 20 days of retention, and we wrote 50 days of metas
// We should get one group: 0th-20th
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 21, len(metas[0])) // 0th-20th

// Tenant 3 has 200 days of retention, and we wrote 500 days of metas
// We should get one group: 0th-200th
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 201, len(metas[0])) // 0th-500th

// Tenant 4 has 400 days of retention, and we wrote 500 days of metas
// Since the manager looks up to 100 days, we shouldn't have deleted any metas
metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500)
metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500)
require.Equal(t, 1, len(metas))
require.Equal(t, 401, len(metas[0])) // 0th-500th
},
Expand All @@ -234,9 +235,9 @@ func TestRetention(t *testing.T) {
putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 100)
putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime.Add(-150*24*time.Hour), 50)
},
check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) {
check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) {
// We should get two groups: 0th-30th and 151th-200th
metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500)
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500)
require.Equal(t, 2, len(metas))
require.Equal(t, 31, len(metas[0])) // 0th-30th day
require.Equal(t, 50, len(metas[1])) // 151th-200th day
Expand Down Expand Up @@ -270,7 +271,7 @@ func TestRetention(t *testing.T) {
}
require.NoError(t, err)

tc.check(t, schema, bloomStore)
tc.check(t, bloomStore)
})
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ func TestRetentionRunsOncePerDay(t *testing.T) {
require.NoError(t, err)

// We should get only the first 30 days of metas
metas := getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100)
metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100)
require.Equal(t, 1, len(metas))
require.Equal(t, 31, len(metas[0])) // 0th-30th day

Expand All @@ -315,7 +316,7 @@ func TestRetentionRunsOncePerDay(t *testing.T) {
err = rm.Apply(context.Background())
require.NoError(t, err)

metas = getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100)
metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100)
require.Equal(t, 1, len(metas))
require.Equal(t, 100, len(metas[0]))

Expand All @@ -327,12 +328,14 @@ func TestRetentionRunsOncePerDay(t *testing.T) {
require.NoError(t, err)

// We should only see the first 30 days of metas
metas = getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100)
metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100)
require.Equal(t, 1, len(metas))
require.Equal(t, 30, len(metas[0])) // 0th-30th day
}

func putMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) {
const metasPerDay = 2

startDay := storageconfig.NewDayTime(start)
endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour))
for day := startDay; day.After(endDay); day = day.Dec() {
Expand All @@ -343,52 +346,46 @@ func putMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bl
bloomClient, err := bloomStore.Client(dayTable.ModelTime())
require.NoErrorf(t, err, "failed to get bloom client for day %d: %s", day, err)

err = bloomClient.PutMeta(context.Background(), bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: dayTable.String(),
Bounds: v1.NewBounds(0, 100),
for i := 0; i < metasPerDay; i++ {
err = bloomClient.PutMeta(context.Background(), bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: dayTable.String(),
Bounds: v1.NewBounds(model.Fingerprint(i*100), model.Fingerprint(i*100+100)),
},
},
},
Blocks: []bloomshipper.BlockRef{},
})
require.NoError(t, err)
Blocks: []bloomshipper.BlockRef{},
})
require.NoError(t, err)
}
}
}

// getMetasForLastNDays returns groups of continuous metas for the last N days.
func getGroupedMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) [][]bloomshipper.Meta {
metasGrouped := make([][]bloomshipper.Meta, 0)
currentGroup := make([]bloomshipper.Meta, 0)
func getGroupedMetasForLastNDays(t *testing.T, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) [][][]bloomshipper.Meta {
metasGrouped := make([][][]bloomshipper.Meta, 0)
currentGroup := make([][]bloomshipper.Meta, 0)

startDay := storageconfig.NewDayTime(start)
endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour))

for day := startDay; day.After(endDay); day = day.Dec() {
period, err := schemaCfg.SchemaForTime(day.ModelTime())
metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.NewInterval(day.ModelTime(), day.ModelTime().Add(1*time.Second)),
Keyspace: v1.NewBounds(0, math.MaxUint64),
})
require.NoError(t, err)
dayTable := storageconfig.NewDayTable(day, period.IndexTables.Prefix)

bloomClient, err := bloomStore.Client(dayTable.ModelTime())
require.NoErrorf(t, err, "failed to get bloom client for day %s: %s", day, err)

metas, err := bloomClient.GetMetas(context.Background(), []bloomshipper.MetaRef{{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: dayTable.String(),
Bounds: v1.NewBounds(0, 100),
},
}})
if err != nil {
// Assume we have reached the end of the metas group: cut a new group
if len(metas) == 0 {
// We have reached the end of the metas group: cut a new group
if len(currentGroup) > 0 {
metasGrouped = append(metasGrouped, currentGroup)
currentGroup = make([]bloomshipper.Meta, 0)
currentGroup = make([][]bloomshipper.Meta, 0)
}
continue
}
currentGroup = append(currentGroup, metas[0])
currentGroup = append(currentGroup, metas)
}

// Append the last group if it's not empty
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (s *dummyStore) FetchMetas(_ context.Context, _ bloomshipper.MetaSearchPara
return s.metas, nil
}

func (s *dummyStore) UsersForPeriod(_ context.Context, _ model.Time) ([]string, string, error) {
return nil, "", nil
func (s *dummyStore) UsersFilesForPeriod(ctx context.Context, ts model.Time) (map[string][]string, error) {
return nil, nil
}

func (s *dummyStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) {
Expand Down
Loading

0 comments on commit 87a973d

Please sign in to comment.