Skip to content

Commit c7be72b

Browse files
committed
Make ruler notifiers multi-tenant
This gives each user their own notifier (with its own notification queue), which adds the right user ID header when sending alerts to Alertmanager.
1 parent f84bc47 commit c7be72b

File tree

1 file changed

+69
-21
lines changed

1 file changed

+69
-21
lines changed

ruler/ruler.go

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package ruler
33
import (
44
"flag"
55
"fmt"
6+
"net/http"
67
"net/url"
8+
"sync"
79
"time"
810

911
"github.com/prometheus/client_golang/prometheus"
@@ -14,6 +16,7 @@ import (
1416
"github.com/prometheus/prometheus/promql"
1517
"github.com/prometheus/prometheus/rules"
1618
"golang.org/x/net/context"
19+
"golang.org/x/net/context/ctxhttp"
1720

1821
"github.com/weaveworks/common/user"
1922
"github.com/weaveworks/cortex/chunk"
@@ -87,30 +90,30 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8790

8891
// Ruler evaluates rules.
8992
type Ruler struct {
90-
engine *promql.Engine
91-
pusher Pusher
92-
alertURL *url.URL
93-
notifier *notifier.Notifier
93+
engine *promql.Engine
94+
pusher Pusher
95+
alertURL *url.URL
96+
notifierCfg *config.Config
97+
queueCapacity int
98+
99+
// Per-user notifiers with separate queues.
100+
notifiersMtx sync.Mutex
101+
notifiers map[string]*notifier.Notifier
94102
}
95103

96104
// NewRuler creates a new ruler from a distributor and chunk store.
97105
func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, error) {
98-
n := notifier.New(&notifier.Options{
99-
QueueCapacity: cfg.NotificationQueueCapacity,
100-
})
101106
ncfg, err := buildNotifierConfig(&cfg)
102107
if err != nil {
103108
return nil, err
104109
}
105-
if err = n.ApplyConfig(ncfg); err != nil {
106-
return nil, err
107-
}
108-
go n.Run()
109110
return &Ruler{
110-
engine: querier.NewEngine(d, c),
111-
pusher: d,
112-
alertURL: cfg.ExternalURL.URL,
113-
notifier: n,
111+
engine: querier.NewEngine(d, c),
112+
pusher: d,
113+
alertURL: cfg.ExternalURL.URL,
114+
notifierCfg: ncfg,
115+
queueCapacity: cfg.NotificationQueueCapacity,
116+
notifiers: map[string]*notifier.Notifier{},
114117
}, nil
115118
}
116119

@@ -163,25 +166,65 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
163166
return promConfig, nil
164167
}
165168

166-
func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) *rules.Group {
169+
func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) (*rules.Group, error) {
167170
appender := appenderAdapter{pusher: r.pusher, ctx: ctx}
171+
userID, err := user.GetID(ctx)
172+
if err != nil {
173+
return nil, err
174+
}
175+
notifier, err := r.getOrCreateNotifier(userID)
176+
if err != nil {
177+
return nil, err
178+
}
168179
opts := &rules.ManagerOptions{
169180
SampleAppender: appender,
170181
QueryEngine: r.engine,
171182
Context: ctx,
172183
ExternalURL: r.alertURL,
173-
Notifier: r.notifier,
184+
Notifier: notifier,
174185
}
175186
delay := 0 * time.Second // Unused, so 0 value is fine.
176-
return rules.NewGroup("default", delay, rs, opts)
187+
return rules.NewGroup("default", delay, rs, opts), nil
188+
}
189+
190+
func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Notifier, error) {
191+
r.notifiersMtx.Lock()
192+
defer r.notifiersMtx.Unlock()
193+
194+
n, ok := r.notifiers[userID]
195+
if ok {
196+
return n, nil
197+
}
198+
199+
n = notifier.New(&notifier.Options{
200+
QueueCapacity: r.queueCapacity,
201+
Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
202+
req.Header.Set(user.OrgIDHeaderName, userID)
203+
return ctxhttp.Do(ctx, client, req)
204+
},
205+
})
206+
207+
// This should never fail, unless there's a programming mistake.
208+
if err := n.ApplyConfig(r.notifierCfg); err != nil {
209+
return nil, err
210+
}
211+
go n.Run()
212+
213+
// TODO: Remove notifiers for stale users. Right now this is a slow leak.
214+
r.notifiers[userID] = n
215+
return n, nil
177216
}
178217

179218
// Evaluate a list of rules in the given context.
180219
func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
181220
log.Debugf("Evaluating %d rules...", len(rs))
182221
start := time.Now()
183-
g := r.newGroup(ctx, rs)
184-
g.Eval()
222+
g, err := r.newGroup(ctx, rs)
223+
if err != nil {
224+
log.Errorf("Failed to create rule group: %v", err)
225+
} else {
226+
g.Eval()
227+
}
185228
// The prometheus routines we're calling have their own instrumentation
186229
// but, a) it's rule-based, not group-based, b) it's a summary, not a
187230
// histogram, so we can't reliably aggregate.
@@ -191,7 +234,12 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
191234

192235
// Stop stops the Ruler.
193236
func (r *Ruler) Stop() {
194-
r.notifier.Stop()
237+
r.notifiersMtx.Lock()
238+
defer r.notifiersMtx.Unlock()
239+
240+
for _, n := range r.notifiers {
241+
n.Stop()
242+
}
195243
}
196244

197245
// Server is a rules server.

0 commit comments

Comments
 (0)