Skip to content

Commit

Permalink
Invalidate caches on delete (#5661)
Browse files Browse the repository at this point in the history
* Generate cache invalidation numbers in the delete store

* Get cache generation numbers from the store on request

* changlog

* rename tombstones to something more meaningful

* User invisible module

* query frontend relies on a compactor to get the cache generation number

* fix serialization

* source -> name

* lint errors

* lint errors

* log non-200 responses

* add jsonnet changes

* lint

* review feedback

* review feedback

* client rename
  • Loading branch information
MasslessParticle authored Apr 21, 2022
1 parent 3fa6cc9 commit 9cef86b
Show file tree
Hide file tree
Showing 20 changed files with 543 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ to include only the most relevant.
* [5543](https://github.com/grafana/loki/pull/5543) **cyriltovena**: update loki go version to 1.17.8
* [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option
* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides
* [5661](https://github.com/grafana/loki/pull/5450) **masslessparticle**: Invalidate caches on deletes
* [5358](https://github.com/grafana/loki/pull/5358) **DylanGuedes**: Add `RingMode` support to `IndexGateway`
* [5435](https://github.com/grafana/loki/pull/5435) **slim-bean**: set match_max_concurrent true by default
* [5361](https://github.com/grafana/loki/pull/5361) **cyriltovena**: Add usage report into Loki.
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]
# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]
# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2041,6 +2045,11 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Which deletion mode to use. Supported values are: disabled,
# whole-stream-deletion, filter-only, filter-and-delete
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]
# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
Expand Down
42 changes: 36 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,17 @@ import (
"os"
"time"

"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/tenant"
gerrors "github.com/pkg/errors"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runtimeconfig"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
gerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
Expand Down Expand Up @@ -53,6 +52,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
Expand Down Expand Up @@ -494,11 +494,17 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { ret
func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

cacheGenClient, err := t.cacheGenClient()
if err != nil {
return nil, err
}

tripperware, stopper, err := queryrange.NewTripperware(
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig,
generationnumber.NewGenNumberLoader(cacheGenClient, prometheus.DefaultRegisterer),
prometheus.DefaultRegisterer,
)
if err != nil {
Expand All @@ -510,6 +516,29 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
return services.NewIdleService(nil, nil), nil
}

func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}

if !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))

Expand Down Expand Up @@ -766,11 +795,12 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

// TODO: update this when the other deletion modes are available
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion {
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
}

return t.compactor, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -27,6 +28,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")

f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{}, nil)
}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewTripperware(
log log.Logger,
limits Limits,
schema config.SchemaConfig,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
metrics := NewMetrics(registerer)
Expand All @@ -61,7 +62,7 @@ func NewTripperware(
}

metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c,
PrometheusExtractor{}, metrics, registerer)
cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -387,6 +388,7 @@ func NewMetricTripperware(
schema config.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
extractor queryrangebase.Extractor,
metrics *Metrics,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -414,7 +416,7 @@ func NewMetricTripperware(
limits,
codec,
extractor,
nil,
cacheGenNumLoader,
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestMetricsTripperware(t *testing.T) {
}

func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}

func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestLabelsTripperware(t *testing.T) {
}

func TestLogNoRegex(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestLogNoRegex(t *testing.T) {
}

func TestUnhandledPath(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand All @@ -403,7 +403,7 @@ func TestUnhandledPath(t *testing.T) {

func TestRegexpParamsSupport(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestPostQueries(t *testing.T) {
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestEntriesLimitsTripperware(t *testing.T) {
}

func TestEntriesLimitWithZeroTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

if c.deleteMode == deletion.WholeStreamDeletion {
if c.deleteMode != deletion.Disabled {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,6 @@ import (

const testUserID = "test-user"

type mockDeleteRequestsStore struct {
deleteRequests []DeleteRequest
}

func (m mockDeleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}

func (m mockDeleteRequestsStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
return nil
}

func (m mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
panic("implement me")
}

func (m mockDeleteRequestsStore) GetDeleteRequestsForUserByStatus(ctx context.Context, userID string, status DeleteRequestStatus) ([]DeleteRequest, error) {
panic("implement me")
}

func (m mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
panic("implement me")
}

func (m mockDeleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
panic("implement me")
}

func (m mockDeleteRequestsStore) GetPendingDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
panic("implement me")
}

func (m mockDeleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error {
panic("implement me")
}

func (m mockDeleteRequestsStore) Stop() {
panic("implement me")
}

func TestDeleteRequestsManager_Expired(t *testing.T) {
type resp struct {
isExpired bool
Expand Down Expand Up @@ -256,3 +216,12 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
})
}
}

type mockDeleteRequestsStore struct {
DeleteRequestsStore
deleteRequests []DeleteRequest
}

func (m mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}
Loading

0 comments on commit 9cef86b

Please sign in to comment.