Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Handle SIGHUP #2139

Merged
merged 3 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -167,7 +167,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
cmd := root.Command("ls", "List all blocks in the bucket")
output := cmd.Flag("output", "Optional format in which to print each block's information. Options are 'json', 'wide' or a custom template.").
Short('o').Default("").String()
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -260,7 +260,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
Default("FROM", "UNTIL").Enums(inspectColumns...)
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()

m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {

// Parse selector.
selectorLabels, err := parseFlagLabels(*selector)
Expand Down Expand Up @@ -316,7 +316,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

comp := component.Bucket
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func registerCheckRules(m map[string]setupFunc, root *kingpin.CmdClause, name st
"The rule files to check.",
).Required().ExistingFiles()

m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
m[name+" rules"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})
return checkRulesFiles(logger, ruleFiles)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
time.Duration(*httpGracePeriod),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
}
}
Expand Down
35 changes: 33 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
logFormatJson = "json"
)

type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, bool) error
type setupFunc func(*run.Group, log.Logger, *prometheus.Registry, opentracing.Tracer, <-chan struct{}, bool) error

func main() {
if os.Getenv("DEBUG") != "" {
Expand Down Expand Up @@ -178,7 +178,10 @@ func main() {
})
}

if err := cmds[cmd](&g, logger, metrics, tracer, *logLevel == "debug"); err != nil {
// Create a signal channel to dispatch reload events to sub-commands.
reloadCh := make(chan struct{}, 1)

if err := cmds[cmd](&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
level.Error(logger).Log("err", errors.Wrapf(err, "%s command failed", cmd))
os.Exit(1)
}
Expand All @@ -193,6 +196,16 @@ func main() {
})
}

// Listen for reload signals.
{
cancel := make(chan struct{})
g.Add(func() error {
return reload(logger, cancel, reloadCh)
}, func(error) {
close(cancel)
})
}

if err := g.Run(); err != nil {
level.Error(logger).Log("msg", "running command failed", "err", err)
os.Exit(1)
Expand All @@ -211,3 +224,21 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
return errors.New("canceled")
}
}

func reload(logger log.Logger, cancel <-chan struct{}, r chan<- struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP)
for {
select {
case s := <-c:
level.Info(logger).Log("msg", "caught signal. Reloading.", "signal", s)
select {
case r <- struct{}{}:
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log("msg", "relaod dispatched.")
default:
}
case <-cancel:
return errors.New("canceled")
}
}
}
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down
29 changes: 4 additions & 25 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import (
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -116,7 +113,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]").
Default("golang").Hidden().String()

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down Expand Up @@ -168,6 +165,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
logger,
reg,
tracer,
reload,
lset,
*alertmgrs,
*alertmgrsTimeout,
Expand Down Expand Up @@ -209,6 +207,7 @@ func runRule(
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
reloadSignal <-chan struct{},
lset labels.Labels,
alertmgrURLs []string,
alertmgrsTimeout time.Duration,
Expand Down Expand Up @@ -484,6 +483,7 @@ func runRule(
case <-cancel:
return errors.New("canceled")
case <-reload:
case <-reloadSignal:
}

level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
Expand Down Expand Up @@ -520,27 +520,6 @@ func runRule(
close(cancel)
})
}
{
cancel := make(chan struct{})

g.Add(func() error {
c := make(chan os.Signal, 1)
for {
signal.Notify(c, syscall.SIGHUP)
select {
case <-c:
select {
case reload <- struct{}{}:
default:
}
case <-cancel:
return errors.New("canceled")
}
}
}, func(error) {
close(cancel)
})
}

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
reloader.ReloadURLFromBase(*promURL),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
Expand Down