-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stateless mode for Thanos Ruler #4250
Closed
Closed
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
64a9700
import wal package from grafana/agent (with string interning disabled)
idoqo f651423
Set up remote-write config and test skeleton
idoqo 0e2b82f
Setup fanout and related storages for stateless ruler
idoqo 02f8981
Optionally run ruler in stateless mode
idoqo fc8f103
Set up tests and implementations for configuring remote-write for ruler
idoqo 44b6219
Implement stub querier for WAL storage to fix nil pointer error
idoqo a5e3a78
Setup e2e test for stateless ruler
idoqo f05fabd
Add copied code commentary to remotewrite packages
idoqo 7b87e52
Use static addresses for am and querier
idoqo 2729902
Remove need for separate remote-write flag for stateless ruler
idoqo 9e033d0
Generate docs for stateless ruler flags and fix tests
idoqo 64f4c26
Use promauto for prometheus primitives
idoqo 0290a3d
Group imports and satisfy go-lint
idoqo 77ee337
Always return empty series set from WAL storage
idoqo 933a875
re-generate rule documentation
idoqo 41d3408
copyright headers to satisfy golint
idoqo 79b06d3
Rename wal storage metrics
idoqo 6b0612c
Use Prometheus' remote write config instead of rolling another
idoqo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,12 @@ import ( | |
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/common/route" | ||
"github.com/prometheus/prometheus/config" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/pkg/relabel" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/rules" | ||
"github.com/prometheus/prometheus/storage" | ||
"github.com/prometheus/prometheus/tsdb" | ||
"github.com/prometheus/prometheus/util/strutil" | ||
"github.com/thanos-io/thanos/pkg/errutil" | ||
|
@@ -50,6 +52,7 @@ import ( | |
"github.com/thanos-io/thanos/pkg/promclient" | ||
"github.com/thanos-io/thanos/pkg/query" | ||
thanosrules "github.com/thanos-io/thanos/pkg/rules" | ||
"github.com/thanos-io/thanos/pkg/rules/remotewrite" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" | ||
httpserver "github.com/thanos-io/thanos/pkg/server/http" | ||
|
@@ -75,6 +78,8 @@ type ruleConfig struct { | |
alertQueryURL *url.URL | ||
alertRelabelConfigYAML []byte | ||
|
||
rwConfig *extflag.PathOrContent | ||
|
||
resendDelay time.Duration | ||
evalInterval time.Duration | ||
ruleFiles []string | ||
|
@@ -117,6 +122,8 @@ func registerRule(app *extkingpin.App) { | |
cmd.Flag("eval-interval", "The default evaluation interval to use."). | ||
Default("30s").DurationVar(&conf.evalInterval) | ||
|
||
conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) | ||
|
||
reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "") | ||
|
||
conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) | ||
|
@@ -319,25 +326,52 @@ func runRule( | |
// Discover and resolve query addresses. | ||
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) | ||
} | ||
var ( | ||
appendable storage.Appendable | ||
queryable storage.Queryable | ||
db *tsdb.DB | ||
) | ||
|
||
db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) | ||
rwCfgYAML, err := conf.rwConfig.Content() | ||
if err != nil { | ||
return errors.Wrap(err, "open TSDB") | ||
return err | ||
} | ||
|
||
level.Debug(logger).Log("msg", "removing storage lock file if any") | ||
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { | ||
return errors.Wrap(err, "remove storage lock files") | ||
} | ||
if len(rwCfgYAML) > 0 { | ||
var rwCfg config.RemoteWriteConfig | ||
rwCfg, err = remotewrite.LoadRemoteWriteConfig(rwCfgYAML) | ||
if err != nil { | ||
return err | ||
} | ||
walDir := filepath.Join(conf.dataDir, rwCfg.Name) | ||
remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, &rwCfg) | ||
if err != nil { | ||
return errors.Wrap(err, "set up remote-write store for ruler") | ||
} | ||
appendable = remoteStore | ||
queryable = remoteStore | ||
} else { | ||
db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) | ||
if err != nil { | ||
return errors.Wrap(err, "open TSDB") | ||
} | ||
|
||
{ | ||
done := make(chan struct{}) | ||
g.Add(func() error { | ||
<-done | ||
return db.Close() | ||
}, func(error) { | ||
close(done) | ||
}) | ||
level.Debug(logger).Log("msg", "removing storage lock file if any") | ||
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { | ||
return errors.Wrap(err, "remove storage lock files") | ||
} | ||
|
||
{ | ||
done := make(chan struct{}) | ||
g.Add(func() error { | ||
<-done | ||
return db.Close() | ||
}, func(error) { | ||
close(done) | ||
}) | ||
} | ||
appendable = db | ||
queryable = db | ||
} | ||
|
||
// Build the Alertmanager clients. | ||
|
@@ -435,9 +469,9 @@ func runRule( | |
rules.ManagerOptions{ | ||
NotifyFunc: notifyFunc, | ||
Logger: logger, | ||
Appendable: db, | ||
Appendable: appendable, | ||
ExternalURL: nil, | ||
Queryable: db, | ||
Queryable: queryable, | ||
ResendDelay: conf.resendDelay, | ||
}, | ||
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), | ||
|
@@ -522,7 +556,7 @@ func runRule( | |
) | ||
|
||
// Start gRPC server. | ||
{ | ||
if db != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I missed it last time. Even if it is stateless mode, we still need to start the gRPC server because we need the prober and the Example code: // Start gRPC server.
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
options := []grpcserver.Option{
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
}
if db != nil {
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)))
}
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, options...)
g.Add(func() error {
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
s.Shutdown(err)
}) |
||
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset) | ||
|
||
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) | ||
|
@@ -547,6 +581,7 @@ func runRule( | |
s.Shutdown(err) | ||
}) | ||
} | ||
|
||
// Start UI & metrics HTTP server. | ||
{ | ||
router := route.New() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package remotewrite | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/config" | ||
"github.com/prometheus/prometheus/storage" | ||
"github.com/prometheus/prometheus/storage/remote" | ||
"gopkg.in/yaml.v2" | ||
) | ||
|
||
// LoadRemoteWriteConfig prepares a RemoteWriteConfig instance from a given YAML config. | ||
func LoadRemoteWriteConfig(configYAML []byte) (config.RemoteWriteConfig, error) { | ||
var cfg config.RemoteWriteConfig | ||
if err := yaml.Unmarshal(configYAML, &cfg); err != nil { | ||
return cfg, err | ||
} | ||
return cfg, nil | ||
} | ||
|
||
// NewFanoutStorage creates a storage that fans-out to both the WAL and a configured remote storage. | ||
// The remote storage tails the WAL and sends the metrics it reads using Prometheus' remote_write. | ||
func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig *config.RemoteWriteConfig) (storage.Storage, error) { | ||
walStore, err := NewStorage(logger, reg, walDir) | ||
if err != nil { | ||
return nil, err | ||
} | ||
remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil) | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err := remoteStore.ApplyConfig(&config.Config{ | ||
GlobalConfig: config.DefaultGlobalConfig, | ||
RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig}, | ||
}); err != nil { | ||
return nil, errors.Wrap(err, "applying config to remote storage") | ||
} | ||
return storage.NewFanout(logger, walStore, remoteStore), nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squat @bwplotka
In this case though, an empty
--remote-write.config
(or--remote-write.config-file
)will fall back to using the TSDB-backed ruler. This is becauselen(rwCfgYAML) == 0
either means:I couldn't think of a reliable way to distinguish between them. I've also documented this behavior in the flag help, but I don't know if it is something we want. An alternative would be to provide a separate flag for enabling stateless mode (e.g
--remote.write
or--remote-write.enabled
).