diff --git a/pkg/bloomcompactor/retention.go b/pkg/bloomcompactor/retention.go index 285296b7cde58..6366b45f20be9 100644 --- a/pkg/bloomcompactor/retention.go +++ b/pkg/bloomcompactor/retention.go @@ -142,7 +142,7 @@ func (r *RetentionManager) Apply(ctx context.Context) error { } objectClient := bloomClient.ObjectClient() - tenants, table, err := r.bloomStore.UsersForPeriod(ctx, day.ModelTime()) + tenants, err := r.bloomStore.UsersFilesForPeriod(ctx, day.ModelTime()) if err != nil { r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds()) r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed)) @@ -155,32 +155,23 @@ func (r *RetentionManager) Apply(ctx context.Context) error { break } - for _, tenant := range tenants { + for tenant, objectKeys := range tenants { tenantRetention := findLongestRetention(tenant, r.limits) expirationDay := storageconfig.NewDayTime(today.Add(-tenantRetention)) if !day.Before(expirationDay) { continue } - tenantLogger := log.With(dayLogger, "table", table, "tenant", tenant, "retention", tenantRetention) + tenantLogger := log.With(dayLogger, "tenant", tenant, "retention", tenantRetention) level.Info(tenantLogger).Log("msg", "applying retention to tenant") - // List all keys under the tenant directory - // Note: we cannot delete the tenant directory directly because it is not an actual key in the object store - // Instead, we need to list all keys under the tenant directory and delete them one by one - tenantDir := bloomClient.Tenant(tenant, table) - tenantObjects, _, err := objectClient.List(ctx, tenantDir.Addr(), "") - if err != nil { - r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds()) - r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed)) - return errors.Wrapf(err, "listing tenant directory %s", tenantDir.Addr()) - } - - for _, object := range tenantObjects { - if err := objectClient.DeleteObject(ctx, object.Key); err != nil { + // Note: we cannot delete the tenant directory directly because it is not an + // actual key in the object store. Instead, we need to delete all keys one by one. + for _, key := range objectKeys { + if err := objectClient.DeleteObject(ctx, key); err != nil { r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds()) r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed)) - return errors.Wrapf(err, "deleting tenant directory %s", tenantDir.Addr()) + return errors.Wrapf(err, "deleting key %s", key) } } } diff --git a/pkg/bloomcompactor/retention_test.go b/pkg/bloomcompactor/retention_test.go index 4e13108b5012e..550b42f467097 100644 --- a/pkg/bloomcompactor/retention_test.go +++ b/pkg/bloomcompactor/retention_test.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "context" + "math" "os" "testing" "time" @@ -32,7 +33,7 @@ func TestRetention(t *testing.T) { lim mockRetentionLimits prePopulate func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) expectErr bool - check func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) + check func(t *testing.T, bloomStore *bloomshipper.BloomStore) }{ { name: "retention disabled", @@ -52,14 +53,14 @@ func TestRetention(t *testing.T) { putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50) putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) }, - check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { - metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500) + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 200, len(metas[0])) - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 50, len(metas[0])) - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 500, len(metas[0])) }, @@ -82,14 +83,14 @@ func TestRetention(t *testing.T) { putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50) putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) }, - check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { - metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500) + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 200, len(metas[0])) - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 50, len(metas[0])) - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 500, len(metas[0])) }, @@ -130,29 +131,29 @@ func TestRetention(t *testing.T) { putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) }, - check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { // Tenant 1 has 40 days of retention, and we wrote 200 days of metas // We should get two groups: 0th-40th and 101th-200th - metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500) + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) require.Equal(t, 2, len(metas)) require.Equal(t, 41, len(metas[0])) // 0-40th day require.Equal(t, 99, len(metas[1])) // 101th-200th day // Tenant 2 has 20 days of retention, and we wrote 50 days of metas // We should get one group: 0th-20th - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 21, len(metas[0])) // 0th-20th // Tenant 3 has 200 days of retention, and we wrote 500 days of metas // Since the manager looks up to 100 days, we shouldn't have deleted any metas - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 500, len(metas[0])) // 0th-500th // Tenant 4 has 400 days of retention, and we wrote 500 days of metas // Since the manager looks up to 100 days, we shouldn't have deleted any metas - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 500, len(metas[0])) // 0th-500th }, @@ -192,28 +193,28 @@ func TestRetention(t *testing.T) { putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) }, - check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { // Tenant 1 has 40 days of retention, and we wrote 200 days of metas // We should get one groups: 0th-40th - metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500) + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 41, len(metas[0])) // 0-40th day // Tenant 2 has 20 days of retention, and we wrote 50 days of metas // We should get one group: 0th-20th - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 21, len(metas[0])) // 0th-20th // Tenant 3 has 200 days of retention, and we wrote 500 days of metas // We should get one group: 0th-200th - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 201, len(metas[0])) // 0th-500th // Tenant 4 has 400 days of retention, and we wrote 500 days of metas // Since the manager looks up to 100 days, we shouldn't have deleted any metas - metas = getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) + metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500) require.Equal(t, 1, len(metas)) require.Equal(t, 401, len(metas[0])) // 0th-500th }, @@ -234,9 +235,9 @@ func TestRetention(t *testing.T) { putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 100) putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime.Add(-150*24*time.Hour), 50) }, - check: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { // We should get two groups: 0th-30th and 151th-200th - metas := getGroupedMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 500) + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) require.Equal(t, 2, len(metas)) require.Equal(t, 31, len(metas[0])) // 0th-30th day require.Equal(t, 50, len(metas[1])) // 151th-200th day @@ -270,7 +271,7 @@ func TestRetention(t *testing.T) { } require.NoError(t, err) - tc.check(t, schema, bloomStore) + tc.check(t, bloomStore) }) } } @@ -305,7 +306,7 @@ func TestRetentionRunsOncePerDay(t *testing.T) { require.NoError(t, err) // We should get only the first 30 days of metas - metas := getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100) + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) require.Equal(t, 1, len(metas)) require.Equal(t, 31, len(metas[0])) // 0th-30th day @@ -315,7 +316,7 @@ func TestRetentionRunsOncePerDay(t *testing.T) { err = rm.Apply(context.Background()) require.NoError(t, err) - metas = getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100) + metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) require.Equal(t, 1, len(metas)) require.Equal(t, 100, len(metas[0])) @@ -327,12 +328,14 @@ func TestRetentionRunsOncePerDay(t *testing.T) { require.NoError(t, err) // We should only see the first 30 days of metas - metas = getGroupedMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100) + metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) require.Equal(t, 1, len(metas)) require.Equal(t, 30, len(metas[0])) // 0th-30th day } func putMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) { + const metasPerDay = 2 + startDay := storageconfig.NewDayTime(start) endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour)) for day := startDay; day.After(endDay); day = day.Dec() { @@ -343,52 +346,46 @@ func putMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bl bloomClient, err := bloomStore.Client(dayTable.ModelTime()) require.NoErrorf(t, err, "failed to get bloom client for day %d: %s", day, err) - err = bloomClient.PutMeta(context.Background(), bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: tenant, - TableName: dayTable.String(), - Bounds: v1.NewBounds(0, 100), + for i := 0; i < metasPerDay; i++ { + err = bloomClient.PutMeta(context.Background(), bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + TenantID: tenant, + TableName: dayTable.String(), + Bounds: v1.NewBounds(model.Fingerprint(i*100), model.Fingerprint(i*100+100)), + }, }, - }, - Blocks: []bloomshipper.BlockRef{}, - }) - require.NoError(t, err) + Blocks: []bloomshipper.BlockRef{}, + }) + require.NoError(t, err) + } } } // getMetasForLastNDays returns groups of continuous metas for the last N days. -func getGroupedMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) [][]bloomshipper.Meta { - metasGrouped := make([][]bloomshipper.Meta, 0) - currentGroup := make([]bloomshipper.Meta, 0) +func getGroupedMetasForLastNDays(t *testing.T, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) [][][]bloomshipper.Meta { + metasGrouped := make([][][]bloomshipper.Meta, 0) + currentGroup := make([][]bloomshipper.Meta, 0) startDay := storageconfig.NewDayTime(start) endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour)) for day := startDay; day.After(endDay); day = day.Dec() { - period, err := schemaCfg.SchemaForTime(day.ModelTime()) + metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{ + TenantID: tenant, + Interval: bloomshipper.NewInterval(day.ModelTime(), day.ModelTime().Add(1*time.Second)), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }) require.NoError(t, err) - dayTable := storageconfig.NewDayTable(day, period.IndexTables.Prefix) - - bloomClient, err := bloomStore.Client(dayTable.ModelTime()) - require.NoErrorf(t, err, "failed to get bloom client for day %s: %s", day, err) - - metas, err := bloomClient.GetMetas(context.Background(), []bloomshipper.MetaRef{{ - Ref: bloomshipper.Ref{ - TenantID: tenant, - TableName: dayTable.String(), - Bounds: v1.NewBounds(0, 100), - }, - }}) - if err != nil { - // Assume we have reached the end of the metas group: cut a new group + if len(metas) == 0 { + // We have reached the end of the metas group: cut a new group if len(currentGroup) > 0 { metasGrouped = append(metasGrouped, currentGroup) - currentGroup = make([]bloomshipper.Meta, 0) + currentGroup = make([][]bloomshipper.Meta, 0) } continue } - currentGroup = append(currentGroup, metas[0]) + currentGroup = append(currentGroup, metas) } // Append the last group if it's not empty diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 764c1953fc70f..1464aaa86b9db 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -54,8 +54,8 @@ func (s *dummyStore) FetchMetas(_ context.Context, _ bloomshipper.MetaSearchPara return s.metas, nil } -func (s *dummyStore) UsersForPeriod(_ context.Context, _ model.Time) ([]string, string, error) { - return nil, "", nil +func (s *dummyStore) UsersFilesForPeriod(ctx context.Context, ts model.Time) (map[string][]string, error) { + return nil, nil } func (s *dummyStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) { diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 6c0e51cc9fcbb..0912487647da0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -5,14 +5,6 @@ import ( "fmt" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "golang.org/x/exp/slices" - "path" - "sort" - "time" - "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/cache" @@ -20,6 +12,12 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/util/constants" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "golang.org/x/exp/slices" + "path" + "sort" ) var ( @@ -30,7 +28,7 @@ type Store interface { ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...FetchOption) ([]*CloseableBlockQuerier, error) - UsersForPeriod(ctx context.Context, ts model.Time) ([]string, string, error) + UsersFilesForPeriod(ctx context.Context, ts model.Time) (map[string][]string, error) Fetcher(ts model.Time) (*Fetcher, error) Client(ts model.Time) (Client, error) Stop() @@ -128,14 +126,9 @@ func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef, _ .. return b.fetcher.FetchBlocks(ctx, refs) } -func (b *bloomStoreEntry) UsersForPeriod(ctx context.Context, ts model.Time) ([]string, string, error) { - tenants := make([]string, 0, 100) - tables := tablesForRange(b.cfg, NewInterval(ts, ts.Add(1*time.Second))) // +1 second since end time is exclusive if falls just on the boundary of a day - if len(tables) != 1 { - return tenants, "", fmt.Errorf("exactly one table expected for TS %s", ts.String()) - } - table := tables[0] - +func (b *bloomStoreEntry) UsersFilesForPeriod(ctx context.Context, ts model.Time) (map[string][]string, error) { + dayTable := config.NewDayTable(config.NewDayTime(ts), b.cfg.IndexTables.Prefix) + table := dayTable.Addr() prefix := path.Join(rootFolder, table) level.Debug(b.fetcher.logger).Log( "msg", "listing tenants", @@ -143,24 +136,35 @@ func (b *bloomStoreEntry) UsersForPeriod(ctx context.Context, ts model.Time) ([] "table", table, "prefix", prefix, ) - list, _, err := b.objectClient.List(ctx, prefix, "") + objects, _, err := b.objectClient.List(ctx, prefix, "") if err != nil { // If there is no table for the given time, we should return an empty list of tenants. if b.objectClient.IsObjectNotFoundErr(err) { - return tenants, "", nil + return nil, nil } - return nil, "", fmt.Errorf("error listing tenants under prefix [%s]: %w", prefix, err) + return nil, fmt.Errorf("error listing tenants under prefix [%s]: %w", prefix, err) + } + if len(objects) == 0 { + return nil, nil } - for _, object := range list { + + // TODO(salvacorts): Use pooling if this becomes a problem. + tenants := make(map[string][]string, 100) + for _, object := range objects { tenant, err := b.ParseTenantKey(key(object.Key)) if err != nil { - return nil, "", fmt.Errorf("error parsing tenant key [%s]: %w", object.Key, err) + return nil, fmt.Errorf("error parsing tenant key [%s]: %w", object.Key, err) + } + + if _, ok := tenants[tenant]; !ok { + tenants[tenant] = make([]string, 0, 100) } - tenants = append(tenants, tenant) + + tenants[tenant] = append(tenants[tenant], object.Key) } - return tenants, table, nil + return tenants, nil } // Fetcher implements Store. @@ -289,11 +293,11 @@ func (b *BloomStore) Block(ref BlockRef) (loc Location) { return } -func (b *BloomStore) UsersForPeriod(ctx context.Context, ts model.Time) ([]string, string, error) { +func (b *BloomStore) UsersFilesForPeriod(ctx context.Context, ts model.Time) (map[string][]string, error) { if store := b.getStore(ts); store != nil { - return store.UsersForPeriod(ctx, ts) + return store.UsersFilesForPeriod(ctx, ts) } - return nil, "", errNoStore + return nil, errNoStore } // Fetcher implements Store.