diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 46ad542bcd22..05a3b8e34ca3 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -2,6 +2,7 @@ package ingester import ( "bytes" + "context" fmt "fmt" "io/ioutil" "os" @@ -206,7 +207,7 @@ func newStreamsIterator(ing ingesterInstances) *streamIterator { inst.streamsMtx.RLock() streams := make([]*stream, 0, len(inst.streams)) inst.streamsMtx.RUnlock() - _ = inst.forAllStreams(func(s *stream) error { + _ = inst.forAllStreams(context.Background(), func(s *stream) error { streams = append(streams, s) return nil }) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index b19c8cb53c9d..c0f7937e1e72 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -242,7 +242,6 @@ func TestUnflushedChunks(t *testing.T) { } func TestIngesterWALBackpressureSegments(t *testing.T) { - walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") require.Nil(t, err) defer os.RemoveAll(walDir) @@ -287,7 +286,6 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } func TestIngesterWALBackpressureCheckpoint(t *testing.T) { - walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") require.Nil(t, err) defer os.RemoveAll(walDir) @@ -353,7 +351,6 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Du return } } - } // mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams @@ -456,7 +453,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) instances = append(instances, inst) @@ -506,7 +503,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := range instances { - inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index f59871ed9716..ad2d5fbf1630 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/chunk" @@ -340,6 +341,8 @@ func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig { func (s *testStore) Stop() {} +func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {} + func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream { userIDs := []string{"1", "2", "3"} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 81cf22dadaf9..09c6607bc61c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -158,6 +158,8 @@ type Ingester struct { metrics *ingesterMetrics wal WAL + + chunkFilter storage.RequestChunkFilterer } // ChunkStore is the interface we need to store chunks. @@ -167,6 +169,7 @@ type ChunkStore interface { SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) GetSchemaConfigs() []chunk.PeriodConfig + SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) } // New makes a new Ingester. @@ -220,6 +223,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid return i, nil } +func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) { + i.chunkFilter = chunkFilter +} + func (i *Ingester) starting(ctx context.Context) error { if i.cfg.WAL.Enabled { // Ignore retain period during wal replay. @@ -404,7 +411,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch) + inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) i.instances[instanceID] = inst } return inst @@ -677,7 +684,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ return err } - if err := instance.addNewTailer(tailer); err != nil { + if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil { return err } tailer.loop() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 36743044af3d..659884bdbf8d 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -284,6 +285,9 @@ func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig { return nil } +func (s *mockStore) SetChunkFilterer(_ storage.RequestChunkFilterer) { +} + type mockQuerierServer struct { ctx context.Context resps []*logproto.QueryResponse @@ -448,7 +452,6 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) { } func TestValidate(t *testing.T) { - for i, tc := range []struct { in Config err bool diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 4711e4dcd108..65b0f3e955a7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -89,9 +90,11 @@ type instance struct { flushOnShutdownSwitch *OnceSwitch metrics *ingesterMetrics + + chunkFilter storage.RequestChunkFilterer } -func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance { +func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch, chunkFilter storage.RequestChunkFilterer) *instance { i := &instance{ cfg: cfg, streams: map[string]*stream{}, @@ -110,6 +113,8 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt wal: wal, metrics: metrics, flushOnShutdownSwitch: flushOnShutdownSwitch, + + chunkFilter: chunkFilter, } i.mapper = newFPMapper(i.getLabelsFromFingerprint) return i @@ -295,7 +300,9 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter ingStats := stats.GetIngesterData(ctx) var iters []iter.EntryIterator + err = i.forMatchingStreams( + ctx, expr.Matchers(), func(stream *stream) error { iter, err := stream.Iterator(ctx, ingStats, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels)) @@ -326,6 +333,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams ingStats := stats.GetIngesterData(ctx) var iters []iter.SampleIterator err = i.forMatchingStreams( + ctx, expr.Selector().Matchers(), func(stream *stream) error { iter, err := stream.SampleIterator(ctx, ingStats, req.Start, req.End, extractor.ForStream(stream.labels)) @@ -363,7 +371,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro }, nil } -func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { +func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { groups, err := loghttp.Match(req.GetGroups()) if err != nil { return nil, err @@ -374,7 +382,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp // If no matchers were supplied we include all streams. if len(groups) == 0 { series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) - err = i.forAllStreams(func(stream *stream) error { + err = i.forAllStreams(ctx, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { series = append(series, logproto.SeriesIdentifier{ @@ -389,7 +397,7 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp } else { dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) for _, matchers := range groups { - err = i.forMatchingStreams(matchers, func(stream *stream) error { + err = i.forMatchingStreams(ctx, matchers, func(stream *stream) error { // consider the stream only if it overlaps the request time range if shouldConsiderStream(stream, req) { // exit early when this stream was added by an earlier group @@ -426,11 +434,18 @@ func (i *instance) numStreams() int { // forAllStreams will execute a function for all streams in the instance. // It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex. -func (i *instance) forAllStreams(fn func(*stream) error) error { +func (i *instance) forAllStreams(ctx context.Context, fn func(*stream) error) error { i.streamsMtx.RLock() defer i.streamsMtx.RUnlock() + var chunkFilter storage.ChunkFilterer + if i.chunkFilter != nil { + chunkFilter = i.chunkFilter.ForRequest(ctx) + } for _, stream := range i.streams { + if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) { + continue + } err := fn(stream) if err != nil { return err @@ -442,6 +457,7 @@ func (i *instance) forAllStreams(fn func(*stream) error) error { // forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc). // It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex. func (i *instance) forMatchingStreams( + ctx context.Context, matchers []*labels.Matcher, fn func(*stream) error, ) error { @@ -450,7 +466,10 @@ func (i *instance) forMatchingStreams( filters, matchers := cutil.SplitFiltersAndMatchers(matchers) ids := i.index.Lookup(matchers) - + var chunkFilter storage.ChunkFilterer + if i.chunkFilter != nil { + chunkFilter = i.chunkFilter.ForRequest(ctx) + } outer: for _, streamID := range ids { stream, ok := i.streamsByFP[streamID] @@ -462,7 +481,9 @@ outer: continue outer } } - + if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) { + continue + } err := fn(stream) if err != nil { return err @@ -471,8 +492,8 @@ outer: return nil } -func (i *instance) addNewTailer(t *tailer) error { - if err := i.forMatchingStreams(t.matchers, func(s *stream) error { +func (i *instance) addNewTailer(ctx context.Context, t *tailer) error { + if err := i.forMatchingStreams(ctx, t.matchers, func(s *stream) error { s.addTailer(t) return nil }); err != nil { @@ -494,8 +515,15 @@ func (i *instance) addTailersToNewStream(stream *stream) { if t.isClosed() { continue } + var chunkFilter storage.ChunkFilterer + if i.chunkFilter != nil { + chunkFilter = i.chunkFilter.ForRequest(t.conn.Context()) + } if isMatching(stream.labels, t.matchers) { + if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) { + continue + } stream.addTailer(t) } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index c7e04fc255a0..0998b48e20c2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/storage" loki_runtime "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -39,7 +40,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}) + i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -66,7 +67,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) const ( concurrent = 10 @@ -124,7 +125,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) lbls := makeRandomLabels() tt := time.Now() @@ -164,7 +165,7 @@ func Test_SeriesQuery(t *testing.T) { cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 - instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) + instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) currentTime := time.Now() @@ -274,7 +275,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) + i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -316,7 +317,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) require.NoError(b, err) for i := 0; i < 10000; i++ { @@ -326,7 +327,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { } b.Run("addNewTailer", func(b *testing.B) { for n := 0; n < b.N; n++ { - _ = inst.addNewTailer(t) + _ = inst.addNewTailer(context.Background(), t) } }) lbs := makeRandomLabels() @@ -366,13 +367,14 @@ func Test_Iterator(t *testing.T) { defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) + instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) // insert data. for i := 0; i < 10; i++ { + // nolint stream := "dispatcher" if i%2 == 0 { stream = "worker" @@ -431,6 +433,71 @@ func Test_Iterator(t *testing.T) { require.Equal(t, int64(8), res.Streams[1].Entries[0].Timestamp.UnixNano()) } +type testFilter struct{} + +func (t *testFilter) ForRequest(ctx context.Context) storage.ChunkFilterer { + return t +} + +func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { + return lbs.Get("log_stream") == "dispatcher" +} + +func Test_ChunkFilter(t *testing.T) { + ingesterConfig := defaultIngesterTestConfig(t) + defaultLimits := defaultLimitsTestConfig() + overrides, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + instance := newInstance( + &ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) + ctx := context.TODO() + direction := logproto.BACKWARD + limit := uint32(2) + + // insert data. + for i := 0; i < 10; i++ { + stream := "dispatcher" + if i%2 == 0 { + stream = "worker" + } + require.NoError(t, + instance.Push(ctx, &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="%s",job="3"}`, stream), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf(`msg="%s_%d"`, stream, i)}, + }, + }, + }, + }), + ) + } + + // prepare iterators. + itrs, err := instance.Query(ctx, + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"}`, + Limit: limit, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: direction, + }, + }, + ) + require.NoError(t, err) + it := iter.NewHeapIterator(ctx, itrs, direction) + defer it.Close() + + for it.Next() { + require.NoError(t, it.Error()) + lbs, err := logql.ParseLabels(it.Labels()) + require.NoError(t, err) + require.NotEqual(t, "dispatcher", lbs.Get("log_stream")) + } +} + type fakeQueryServer func(*logproto.QueryResponse) error func (f fakeQueryServer) Send(res *logproto.QueryResponse) error {