Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rule: do not turn off if resolving fails #7192

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func runRule(
queryClients = append(queryClients, queryClient)
promClients = append(promClients, promclient.NewClient(queryClient, logger, "thanos-rule"))
// Discover and resolve query addresses.
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval)
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval, logger)
}

if cfg.GRPCConfig != nil {
Expand Down Expand Up @@ -578,7 +578,7 @@ func runRule(
return err
}
// Discover and resolve Alertmanager addresses.
addDiscoveryGroups(g, amClient, conf.alertmgr.alertmgrsDNSSDInterval)
addDiscoveryGroups(g, amClient, conf.alertmgr.alertmgrsDNSSDInterval, logger)

alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}
Expand Down Expand Up @@ -981,7 +981,7 @@ func queryFuncCreator(
}
}

func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.Duration) {
func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.Duration, logger log.Logger) {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
c.Discover(ctx)
Expand All @@ -991,9 +991,10 @@ func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.
})

g.Add(func() error {
return runutil.Repeat(interval, ctx.Done(), func() error {
runutil.RepeatInfinitely(logger, interval, ctx.Done(), func() error {
return c.Resolve(ctx)
})
return nil
}, func(error) {
cancel()
})
Expand Down
17 changes: 17 additions & 0 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ import (
"github.com/thanos-io/thanos/pkg/errutil"
)

// RepeatInfinitely executes f every interval seconds until stopc is closed or f returns an error.
func RepeatInfinitely(logger log.Logger, interval time.Duration, stopc <-chan struct{}, f func() error) {
tick := time.NewTicker(interval)
defer tick.Stop()

for {
if err := f(); err != nil {
level.Error(logger).Log("msg", "function failed. Retrying in next tick", "err", err)
}
select {
case <-stopc:
return
case <-tick.C:
}
}
}

// Repeat executes f every interval seconds until stopc is closed or f returns an error.
// It executes f once right after being called.
func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error {
Expand Down
Loading