diff --git a/pkg/bloomcompactor/tsdb_test.go b/pkg/bloomcompactor/tsdb_test.go index bb3040304b205..30fc668a5a927 100644 --- a/pkg/bloomcompactor/tsdb_test.go +++ b/pkg/bloomcompactor/tsdb_test.go @@ -62,7 +62,7 @@ func TestTSDBSeriesIter(t *testing.T) { }, } srcItr := v1.NewSliceIter(input) - itr, err := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64)) + itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64)) require.NoError(t, err) v1.EqualIterators[*v1.Series]( @@ -79,7 +79,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) { t.Run("expires on creation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{ + itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ {}, // a single entry }, v1.NewBounds(0, math.MaxUint64)) require.Error(t, err) @@ -88,7 +88,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) { t.Run("expires during consumption", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{ + itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ {}, {}, }, v1.NewBounds(0, math.MaxUint64)) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index e3060f873b875..185f41a1a4fc8 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/validation" ) @@ -364,6 +365,14 @@ func (s *testStore) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ch return nil, nil, nil } +func (s *testStore) GetShards(_ context.Context, _ string, _, _ model.Time, _ uint64, _ chunk.Predicate) ([]logproto.Shard, error) { + return nil, nil +} + +func (s *testStore) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) { + return nil, false +} + func (s *testStore) GetSchemaConfigs() []config.PeriodConfig { return defaultPeriodConfigs } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1f62821e1cc8b..f1d4fbf0d114a 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/validation" ) @@ -478,6 +479,14 @@ func (s *mockStore) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*la }, nil } +func (m *mockStore) GetShards(_ context.Context, _ string, _, _ model.Time, _ uint64, _ chunk.Predicate) ([]logproto.Shard, error) { + return nil, nil +} + +func (m *mockStore) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) { + return nil, false +} + func (s *mockStore) Volume(_ context.Context, _ string, _, _ model.Time, limit int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { return &logproto.VolumeResponse{ Volumes: []logproto.Volume{ diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index e7a3b79fd1e23..5dea1144d9a18 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -398,7 +398,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( results, err := ev.Downstream(ctx, []DownstreamQuery{{ Params: ParamsWithShardsOverride{ Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.SampleExpr}, - ShardsOverride: Shards(shards).Encode(), + ShardsOverride: shards.Encode(), }, }}, acc) if err != nil { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 268e05528f781..5ed1191fcb252 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/validation" ) @@ -371,6 +372,14 @@ func (s *storeMock) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*la return nil, nil } +func (s *storeMock) GetShards(_ context.Context, _ string, _, _ model.Time, _ uint64, _ chunk.Predicate) ([]logproto.Shard, error) { + return nil, nil +} + +func (s *storeMock) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) { + return nil, false +} + func (s *storeMock) Volume(ctx context.Context, userID string, from, through model.Time, _ int32, targetLabels []string, _ string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) { args := s.Called(ctx, userID, from, through, targetLabels, matchers) return args.Get(0).(*logproto.VolumeResponse), args.Error(1) @@ -547,6 +556,18 @@ func (q *querierMock) IndexStats(_ context.Context, _ *loghttp.RangeQuery) (*sta return nil, nil } +func (q *querierMock) GetShards(_ context.Context, _ string, _, _ model.Time, _ uint64, _ chunk.Predicate) ([]logproto.Shard, error) { + return nil, nil +} + +func (q *querierMock) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) { + return nil, false +} + +func (q *querierMock) IndexShards(ctx context.Context, req *loghttp.RangeQuery, targetBytesPerShard uint64) (*logproto.ShardsResponse, error) { + return nil, errors.New("unimplemented") +} + func (q *querierMock) Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { args := q.MethodCalled("Volume", ctx, req) diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index cadfceeee20e3..8a305176b6870 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" ) @@ -290,7 +291,7 @@ func TestInstanceFor(t *testing.T) { Params: logql.ParamsWithShardsOverride{ Params: newParams(), ShardsOverride: logql.Shards{ - {Shard: 0, Of: 2}, + logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 2}), }.Encode(), }, }, @@ -298,7 +299,7 @@ func TestInstanceFor(t *testing.T) { Params: logql.ParamsWithShardsOverride{ Params: newParams(), ShardsOverride: logql.Shards{ - {Shard: 1, Of: 2}, + logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 2}), }.Encode(), }, }, @@ -363,8 +364,10 @@ func TestInstanceDownstream(t *testing.T) { queries := []logql.DownstreamQuery{ { Params: logql.ParamsWithShardsOverride{ - Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr}, - ShardsOverride: logql.Shards{{Shard: 0, Of: 2}}.Encode(), + Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr}, + ShardsOverride: logql.Shards{ + logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 2}), + }.Encode(), }, }, } diff --git a/pkg/storage/bloom/v1/bounds.go b/pkg/storage/bloom/v1/bounds.go index 476ab2064ce10..979e30caf5cb9 100644 --- a/pkg/storage/bloom/v1/bounds.go +++ b/pkg/storage/bloom/v1/bounds.go @@ -42,6 +42,7 @@ func BoundsFromProto(pb logproto.FPBounds) FingerprintBounds { // Unsafe cast to avoid allocation. This _requires_ that the underlying types are the same // which is checked by the compiler above func MultiBoundsFromProto(pb []logproto.FPBounds) []FingerprintBounds { + //nolint:unconvert return MultiFingerprintBounds(*(*MultiFingerprintBounds)(unsafe.Pointer(&pb))) } diff --git a/pkg/storage/stores/composite_store.go b/pkg/storage/stores/composite_store.go index 6108e7f4de7cd..74f03d87d147d 100644 --- a/pkg/storage/stores/composite_store.go +++ b/pkg/storage/stores/composite_store.go @@ -247,7 +247,7 @@ func (c CompositeStore) GetShards( func (c CompositeStore) HasForSeries(from, through model.Time) (sharding.ForSeries, bool) { var impls []sharding.ForSeries - c.forStores(context.Background(), from, through, func(_ context.Context, from, through model.Time, store Store) error { + _ = c.forStores(context.Background(), from, through, func(_ context.Context, from, through model.Time, store Store) error { impl, ok := store.HasForSeries(from, through) if ok { impls = append(impls, impl) diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index f96fb62085fce..a7d5a2f89e00a 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -762,11 +762,11 @@ func (c *IndexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, } func (c *IndexReaderWriter) GetShards( - ctx context.Context, - userID string, - from, through model.Time, - targetBytesPerShard uint64, - predicte chunk.Predicate, + _ context.Context, + _ string, + _, _ model.Time, + _ uint64, + _ chunk.Predicate, ) ([]logproto.Shard, error) { return nil, errors.New("unimplemented GetShards() on legacy index stores") } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power_test.go index c806c81e9b2fc..19ca677e59766 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/grafana/loki/pkg/storage/stores/index/stats" - "github.com/grafana/loki/pkg/validation" "github.com/stretchr/testify/require" ) @@ -22,52 +21,52 @@ func TestGuessShardFactor(t *testing.T) { { exp: 4, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, + Bytes: DefaultTSDBMaxBytesPerShard * 4, }, }, { // round up shard factor exp: 16, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard * 15, + Bytes: DefaultTSDBMaxBytesPerShard * 15, }, }, { exp: 2, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard + 1, + Bytes: DefaultTSDBMaxBytesPerShard + 1, }, }, { exp: 0, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard, + Bytes: DefaultTSDBMaxBytesPerShard, }, }, { maxShards: 8, exp: 4, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, + Bytes: DefaultTSDBMaxBytesPerShard * 4, }, }, { maxShards: 2, exp: 2, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, + Bytes: DefaultTSDBMaxBytesPerShard * 4, }, }, { maxShards: 1, exp: 0, stats: stats.Stats{ - Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, + Bytes: DefaultTSDBMaxBytesPerShard * 4, }, }, } { t.Run(fmt.Sprintf("%+v", tc.stats), func(t *testing.T) { - require.Equal(t, tc.exp, GuessShardFactor(tc.stats.Bytes, uint64(validation.DefaultTSDBMaxBytesPerShard), tc.maxShards)) + require.Equal(t, tc.exp, GuessShardFactor(tc.stats.Bytes, uint64(DefaultTSDBMaxBytesPerShard), tc.maxShards)) }) } }