diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 1aed48e7818..6967229b057 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -80,7 +80,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings() idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+ "If none is specified, all blocks will be verified. Repeated field").Strings() - m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { + m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { return err @@ -167,7 +167,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri cmd := root.Command("ls", "List all blocks in the bucket") output := cmd.Flag("output", "Optional format in which to print each block's information. Options are 'json', 'wide' or a custom template."). Short('o').Default("").String() - m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { + m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { return err @@ -260,7 +260,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name Default("FROM", "UNTIL").Enums(inspectColumns...) timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration() - m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { + m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { // Parse selector. selectorLabels, err := parseFlagLabels(*selector) @@ -316,7 +316,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration() label := cmd.Flag("label", "Prometheus label to use as timeline title").String() - m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { + m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { ctx, cancel := context.WithCancel(context.Background()) comp := component.Bucket diff --git a/cmd/thanos/check.go b/cmd/thanos/check.go index d42c12052ce..ae0d43fa275 100644 --- a/cmd/thanos/check.go +++ b/cmd/thanos/check.go @@ -29,7 +29,7 @@ func registerCheckRules(m map[string]setupFunc, root *kingpin.CmdClause, name st "The rule files to check.", ).Required().ExistingFiles() - m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { + m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { // Dummy actor to immediately kill the group after the run function returns. g.Add(func() error { return nil }, func(error) {}) return checkRulesFiles(logger, ruleFiles) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4bdd9cc351d..4f4b9a578f2 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -124,7 +124,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) - m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { return runCompact(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index b7b65369fa4..d38ea26273c 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -44,7 +44,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) { objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) } } diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 9c38acca4b9..f77424d8ced 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -32,7 +32,7 @@ const ( logFormatJson = "json" ) -type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, bool) error +type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, <-chan struct{}, bool) error func main() { if os.Getenv("DEBUG") != "" { @@ -178,7 +178,10 @@ func main() { }) } - if err := cmds[cmd](&g, logger, metrics, tracer, *logLevel == "debug"); err != nil { + // Create a signal channel to dispatch reload events to sub-commands. + reloadCh := make(chan struct{}, 1) + + if err := cmds[cmd](&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil { level.Error(logger).Log("err", errors.Wrapf(err, "%s command failed", cmd)) os.Exit(1) } @@ -193,6 +196,16 @@ func main() { }) } + // Listen for reload signals. + { + cancel := make(chan struct{}) + g.Add(func() error { + return reload(logger, cancel, reloadCh) + }, func(error) { + close(cancel) + }) + } + if err := g.Run(); err != nil { level.Error(logger).Log("msg", "running command failed", "err", err) os.Exit(1) @@ -211,3 +224,21 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error { return errors.New("canceled") } } + +func reload(logger log.Logger, cancel <-chan struct{}, r chan<- struct{}) error { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + for { + select { + case s := <-c: + level.Info(logger).Log("msg", "caught signal. Reloading.", "signal", s) + select { + case r <- struct{}{}: + level.Info(logger).Log("msg", "relaod dispatched.") + default: + } + case <-cancel: + return errors.New("canceled") + } + } +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index f64584b27d9..9433f5601db 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -101,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) - m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { return errors.Wrap(err, "parse federation labels") diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 6f5e2386189..21fa2438056 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -81,7 +81,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() - m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { return errors.Wrap(err, "parse labels") diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 7357f7ae485..b275ba8b368 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -8,13 +8,10 @@ import ( "math/rand" "net/http" "net/url" - "os" - "os/signal" "path" "path/filepath" "strconv" "strings" - "syscall" "time" "github.com/go-kit/kit/log" @@ -116,7 +113,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]"). Default("golang").Hidden().String() - m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { return errors.Wrap(err, "parse labels") @@ -168,6 +165,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { logger, reg, tracer, + reload, lset, *alertmgrs, *alertmgrsTimeout, @@ -209,6 +207,7 @@ func runRule( logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, + reloadSignal <-chan struct{}, lset labels.Labels, alertmgrURLs []string, alertmgrsTimeout time.Duration, @@ -484,6 +483,7 @@ func runRule( case <-cancel: return errors.New("canceled") case <-reload: + case <-reloadSignal: } level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) @@ -520,27 +520,6 @@ func runRule( close(cancel) }) } - { - cancel := make(chan struct{}) - - g.Add(func() error { - c := make(chan os.Signal, 1) - for { - signal.Notify(c, syscall.SIGHUP) - select { - case <-c: - select { - case reload <- struct{}{}: - default: - } - case <-cancel: - return errors.New("canceled") - } - } - }, func(error) { - close(cancel) - }) - } grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 14d559c49fd..3780ad1812d 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -75,7 +75,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z")) - m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { rl := reloader.New( log.With(logger, "component", "reloader"), reloader.ReloadURLFromBase(*promURL), diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index bb2a547d68d..e85f80e67de 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -81,7 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() - m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", minTime, maxTime)