Skip to content

Commit c724737

Browse files
authored
Implement quorum reads and response merging for /v1/alerts. (cortexproject#4126)
* Implement quorum reads and response merging for /v1/alerts. Reading alert listings via /v1/alerts will now read from a quorum of replicas and return a merged response. The merging logic returns a union of the alerts from all responses. When the same alert is returned by multiple replicas, one is just chosen arbitrarily. This should be sufficient as alerts posted to each replica should be identical. The replicas may have some disagreement on the alert status, but this should become consistent over time. For the /v2 API, we can use the "updatedAt" timestamp to choose the most up-to-date status, but this field is not available for the /v1 API. Merging for /v2/alerts and /v2/alerts/groups to follow. Signed-off-by: Steve Simpson <steve.simpson@grafana.com> * Review comments. Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
1 parent 75c29db commit c724737

File tree

6 files changed

+346
-34
lines changed

6 files changed

+346
-34
lines changed

integration/alertmanager_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,17 +497,13 @@ func TestAlertmanagerSharding(t *testing.T) {
497497
require.NoError(t, err)
498498
err = c3.SendAlertToAlermanager(context.Background(), alert(3, 2))
499499
require.NoError(t, err)
500-
501-
// API does not block for the write slowest replica, and reads do not
502-
// currently merge results from multiple replicas, so we have to wait.
503-
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
504-
e2e.Equals(float64(3*testCfg.replicationFactor)),
505-
[]string{"cortex_alertmanager_alerts_received_total"},
506-
e2e.SkipMissingMetrics))
507500
}
508501

509-
// Endpoint: GET /alerts
502+
// Endpoint: GET /v1/alerts
510503
{
504+
// Reads will query at least two replicas and merge the results.
505+
// Therefore, the alerts we posted should always be visible.
506+
511507
for _, c := range clients {
512508
list, err := c.GetAlerts(context.Background())
513509
require.NoError(t, err)
@@ -517,6 +513,13 @@ func TestAlertmanagerSharding(t *testing.T) {
517513

518514
// Endpoint: GET /alerts/groups
519515
{
516+
// Writes do not block for the write slowest replica, and reads do not
517+
// currently merge results from multiple replicas, so we have to wait.
518+
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
519+
e2e.Equals(float64(3*testCfg.replicationFactor)),
520+
[]string{"cortex_alertmanager_alerts_received_total"},
521+
e2e.SkipMissingMetrics))
522+
520523
for _, c := range clients {
521524
list, err := c.GetAlertGroups(context.Background())
522525
require.NoError(t, err)

pkg/alertmanager/distributor.go

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/weaveworks/common/httpgrpc"
1919
"github.com/weaveworks/common/user"
2020

21+
"github.com/cortexproject/cortex/pkg/alertmanager/merger"
2122
"github.com/cortexproject/cortex/pkg/ring"
2223
"github.com/cortexproject/cortex/pkg/ring/client"
2324
"github.com/cortexproject/cortex/pkg/tenant"
@@ -67,7 +68,8 @@ func (d *Distributor) running(ctx context.Context) error {
6768
// IsPathSupported returns true if the given route is currently supported by the Distributor.
6869
func (d *Distributor) IsPathSupported(p string) bool {
6970
// API can be found at https://petstore.swagger.io/?url=https://raw.githubusercontent.com/prometheus/alertmanager/master/api/v2/openapi.yaml.
70-
return d.isQuorumWritePath(p) || d.isUnaryWritePath(p) || d.isUnaryDeletePath(p) || d.isUnaryReadPath(p)
71+
isQuorumReadPath, _ := d.isQuorumReadPath(p)
72+
return d.isQuorumWritePath(p) || d.isUnaryWritePath(p) || d.isUnaryDeletePath(p) || d.isUnaryReadPath(p) || isQuorumReadPath
7173
}
7274

7375
func (d *Distributor) isQuorumWritePath(p string) bool {
@@ -82,8 +84,15 @@ func (d *Distributor) isUnaryDeletePath(p string) bool {
8284
return strings.HasSuffix(path.Dir(p), "/silence")
8385
}
8486

87+
func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
88+
if strings.HasSuffix(p, "/v1/alerts") {
89+
return true, merger.V1Alerts{}
90+
}
91+
return false, nil
92+
}
93+
8594
func (d *Distributor) isUnaryReadPath(p string) bool {
86-
return strings.HasSuffix(p, "/alerts") ||
95+
return strings.HasSuffix(p, "/v2/alerts") ||
8796
strings.HasSuffix(p, "/alerts/groups") ||
8897
strings.HasSuffix(p, "/silences") ||
8998
strings.HasSuffix(path.Dir(p), "/silence") ||
@@ -109,7 +118,7 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
109118

110119
if r.Method == http.MethodPost {
111120
if d.isQuorumWritePath(r.URL.Path) {
112-
d.doQuorumWrite(userID, w, r, logger)
121+
d.doQuorum(userID, w, r, logger, merger.Noop{})
113122
return
114123
}
115124
if d.isUnaryWritePath(r.URL.Path) {
@@ -124,6 +133,10 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
124133
}
125134
}
126135
if r.Method == http.MethodGet || r.Method == http.MethodHead {
136+
if ok, m := d.isQuorumReadPath(r.URL.Path); ok {
137+
d.doQuorum(userID, w, r, logger, m)
138+
return
139+
}
127140
if d.isUnaryReadPath(r.URL.Path) {
128141
d.doUnary(userID, w, r, logger)
129142
return
@@ -133,7 +146,7 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
133146
http.Error(w, "route not supported by distributor", http.StatusNotFound)
134147
}
135148

136-
func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
149+
func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger, m merger.Merger) {
137150
var body []byte
138151
var err error
139152
if r.Body != nil {
@@ -149,13 +162,13 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
149162
}
150163
}
151164

152-
var firstSuccessfulResponse *httpgrpc.HTTPResponse
153-
var firstSuccessfulResponseMtx sync.Mutex
165+
var responses []*httpgrpc.HTTPResponse
166+
var responsesMtx sync.Mutex
154167
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
155168
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
156169
// Use a background context to make sure all alertmanagers get the request even if we return early.
157170
localCtx := user.InjectOrgID(context.Background(), userID)
158-
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorumWrite")
171+
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
159172
defer sp.Finish()
160173

161174
resp, err := d.doRequest(localCtx, am, &httpgrpc.HTTPRequest{
@@ -172,11 +185,9 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
172185
return httpgrpc.ErrorFromHTTPResponse(resp)
173186
}
174187

175-
firstSuccessfulResponseMtx.Lock()
176-
if firstSuccessfulResponse == nil {
177-
firstSuccessfulResponse = resp
178-
}
179-
firstSuccessfulResponseMtx.Unlock()
188+
responsesMtx.Lock()
189+
responses = append(responses, resp)
190+
responsesMtx.Unlock()
180191

181192
return nil
182193
}, func() {})
@@ -186,12 +197,12 @@ func (d *Distributor) doQuorumWrite(userID string, w http.ResponseWriter, r *htt
186197
return
187198
}
188199

189-
firstSuccessfulResponseMtx.Lock() // Another request might be ongoing after quorum.
190-
resp := firstSuccessfulResponse
191-
firstSuccessfulResponseMtx.Unlock()
200+
responsesMtx.Lock() // Another request might be ongoing after quorum.
201+
resps := responses
202+
responsesMtx.Unlock()
192203

193-
if resp != nil {
194-
respondFromHTTPGRPCResponse(w, resp)
204+
if len(resps) > 0 {
205+
respondFromMultipleHTTPGRPCResponses(w, logger, resps, m)
195206
} else {
196207
// This should not happen.
197208
level.Error(logger).Log("msg", "distributor did not receive any response from alertmanagers, but there were no errors")
@@ -287,3 +298,29 @@ func httpToHttpgrpcHeaders(hs http.Header) []*httpgrpc.Header {
287298
}
288299
return result
289300
}
301+
302+
func respondFromMultipleHTTPGRPCResponses(w http.ResponseWriter, logger log.Logger, responses []*httpgrpc.HTTPResponse, merger merger.Merger) {
303+
bodies := make([][]byte, len(responses))
304+
for i, r := range responses {
305+
bodies[i] = r.Body
306+
}
307+
308+
body, err := merger.MergeResponses(bodies)
309+
if err != nil {
310+
level.Error(logger).Log("msg", "failed to merge responses for request", "err", err)
311+
w.WriteHeader(http.StatusInternalServerError)
312+
return
313+
}
314+
315+
// It is assumed by using this function, the caller knows that the responses it receives
316+
// have already been checked for success or failure, and that the headers will always
317+
// match due to the nature of the request. If this is not the case, a different merge
318+
// function should be implemented to cope with the differing responses.
319+
response := &httpgrpc.HTTPResponse{
320+
Code: responses[0].Code,
321+
Headers: responses[0].Headers,
322+
Body: body,
323+
}
324+
325+
respondFromHTTPGRPCResponse(w, response)
326+
}

pkg/alertmanager/distributor_test.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ func TestDistributor_DistributeRequest(t *testing.T) {
4141
expectedTotalCalls int
4242
headersNotPreserved bool
4343
route string
44+
// Paths where responses are merged, we need to supply a valid response body.
45+
// Note that the actual merging logic is tested elsewhere (merger_test.go).
46+
responseBody []byte
4447
}{
4548
{
4649
name: "Write /alerts, Simple AM request, all AM healthy",
@@ -68,14 +71,24 @@ func TestDistributor_DistributeRequest(t *testing.T) {
6871
expectedTotalCalls: 3,
6972
route: "/alerts",
7073
}, {
71-
name: "Read /alerts is sent to only 1 AM",
74+
name: "Read /v1/alerts is sent to 3 AMs",
75+
numAM: 5,
76+
numHappyAM: 5,
77+
replicationFactor: 3,
78+
isRead: true,
79+
expStatusCode: http.StatusOK,
80+
expectedTotalCalls: 3,
81+
route: "/v1/alerts",
82+
responseBody: []byte(`{"status":"success","data":[]}`),
83+
}, {
84+
name: "Read /v2/alerts is sent to only 1 AM",
7285
numAM: 5,
7386
numHappyAM: 5,
7487
replicationFactor: 3,
7588
isRead: true,
7689
expStatusCode: http.StatusOK,
7790
expectedTotalCalls: 1,
78-
route: "/alerts",
91+
route: "/v2/alerts",
7992
}, {
8093
name: "Read /alerts/groups is sent to only 1 AM",
8194
numAM: 5,
@@ -189,7 +202,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
189202
for _, c := range cases {
190203
t.Run(c.name, func(t *testing.T) {
191204
route := "/alertmanager/api/v1" + c.route
192-
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor)
205+
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor, c.responseBody)
193206
t.Cleanup(cleanup)
194207

195208
ctx := user.InjectOrgID(context.Background(), "1")
@@ -207,7 +220,6 @@ func TestDistributor_DistributeRequest(t *testing.T) {
207220
w := httptest.NewRecorder()
208221
d.DistributeRequest(w, req)
209222
resp := w.Result()
210-
211223
require.Equal(t, c.expStatusCode, resp.StatusCode)
212224

213225
if !c.headersNotPreserved {
@@ -252,26 +264,35 @@ func TestDistributor_IsPathSupported(t *testing.T) {
252264
"/alertmanager/api/v1/status": true,
253265
"/alertmanager/api/v1/receivers": true,
254266
"/alertmanager/api/v1/other": false,
267+
"/alertmanager/api/v2/alerts": true,
268+
"/alertmanager/api/v2/alerts/groups": true,
269+
"/alertmanager/api/v2/silences": true,
270+
"/alertmanager/api/v2/silence/id": true,
271+
"/alertmanager/api/v2/silence/anything": true,
272+
"/alertmanager/api/v2/silence/really": true,
273+
"/alertmanager/api/v2/status": true,
274+
"/alertmanager/api/v2/receivers": true,
275+
"/alertmanager/api/v2/other": false,
255276
"/alertmanager/other": false,
256277
"/other": false,
257278
}
258279

259280
for path, isSupported := range supported {
260281
t.Run(path, func(t *testing.T) {
261-
d, _, cleanup := prepare(t, 1, 1, 1)
282+
d, _, cleanup := prepare(t, 1, 1, 1, []byte{})
262283
t.Cleanup(cleanup)
263284
require.Equal(t, isSupported, d.IsPathSupported(path))
264285
})
265286
}
266287
}
267288

268-
func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int) (*Distributor, []*mockAlertmanager, func()) {
289+
func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
269290
ams := []*mockAlertmanager{}
270291
for i := 0; i < numHappyAM; i++ {
271-
ams = append(ams, newMockAlertmanager(i, true))
292+
ams = append(ams, newMockAlertmanager(i, true, responseBody))
272293
}
273294
for i := numHappyAM; i < numAM; i++ {
274-
ams = append(ams, newMockAlertmanager(i, false))
295+
ams = append(ams, newMockAlertmanager(i, false, responseBody))
275296
}
276297

277298
// Use a real ring with a mock KV store to test ring RF logic.
@@ -332,13 +353,15 @@ type mockAlertmanager struct {
332353
mtx sync.Mutex
333354
myAddr string
334355
happy bool
356+
responseBody []byte
335357
}
336358

337-
func newMockAlertmanager(idx int, happy bool) *mockAlertmanager {
359+
func newMockAlertmanager(idx int, happy bool, responseBody []byte) *mockAlertmanager {
338360
return &mockAlertmanager{
339361
receivedRequests: make(map[string]map[int]int),
340362
myAddr: fmt.Sprintf("127.0.0.1:%05d", 10000+idx),
341363
happy: happy,
364+
responseBody: responseBody,
342365
}
343366
}
344367

@@ -370,6 +393,7 @@ func (am *mockAlertmanager) HandleRequest(_ context.Context, in *httpgrpc.HTTPRe
370393
Values: []string{"ok-option-1", "ok-option-2"},
371394
},
372395
},
396+
Body: am.responseBody,
373397
}, nil
374398
}
375399

pkg/alertmanager/merger/merger.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package merger
2+
3+
// Merger represents logic for merging response bodies.
4+
type Merger interface {
5+
MergeResponses([][]byte) ([]byte, error)
6+
}
7+
8+
// Noop is an implementation of the Merger interface which does not actually merge
9+
// responses, but just returns an arbitrary response(the first in the list). It can
10+
// be used for write requests where the response is either empty or inconsequential.
11+
type Noop struct{}
12+
13+
func (Noop) MergeResponses(in [][]byte) ([]byte, error) {
14+
if len(in) == 0 {
15+
return nil, nil
16+
}
17+
return in[0], nil
18+
}

pkg/alertmanager/merger/v1_alerts.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package merger
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"sort"
7+
8+
v1 "github.com/prometheus/alertmanager/api/v1"
9+
)
10+
11+
const (
12+
statusSuccess = "success"
13+
)
14+
15+
// V1Alerts implements the Merger interface for GET /v1/alerts. It returns the union of alerts over
16+
// all the responses. When the same alert exists in multiple responses, the alert instance in the
17+
// earliest response is returned in the final response. We cannot use the UpdatedAt timestamp as
18+
// for V2Alerts, because the v1 API does not provide it.
19+
type V1Alerts struct{}
20+
21+
func (V1Alerts) MergeResponses(in [][]byte) ([]byte, error) {
22+
type bodyType struct {
23+
Status string `json:"status"`
24+
Data []*v1.Alert `json:"data"`
25+
}
26+
27+
alerts := make([]*v1.Alert, 0)
28+
for _, body := range in {
29+
parsed := bodyType{}
30+
if err := json.Unmarshal(body, &parsed); err != nil {
31+
return nil, err
32+
}
33+
if parsed.Status != statusSuccess {
34+
return nil, fmt.Errorf("unable to merge response of status: %s", parsed.Status)
35+
}
36+
alerts = append(alerts, parsed.Data...)
37+
}
38+
39+
merged, err := mergeV1Alerts(alerts)
40+
if err != nil {
41+
return nil, err
42+
}
43+
body := bodyType{
44+
Status: statusSuccess,
45+
Data: merged,
46+
}
47+
48+
return json.Marshal(body)
49+
}
50+
51+
func mergeV1Alerts(in []*v1.Alert) ([]*v1.Alert, error) {
52+
// Select an arbitrary alert for each distinct alert.
53+
alerts := make(map[string]*v1.Alert)
54+
for _, alert := range in {
55+
key := alert.Fingerprint
56+
if _, ok := alerts[key]; !ok {
57+
alerts[key] = alert
58+
}
59+
}
60+
61+
result := make([]*v1.Alert, 0, len(alerts))
62+
for _, alert := range alerts {
63+
result = append(result, alert)
64+
}
65+
66+
// Mimic Alertmanager which returns alerts ordered by fingerprint (as string).
67+
sort.Slice(result, func(i, j int) bool {
68+
return result[i].Fingerprint < result[j].Fingerprint
69+
})
70+
71+
return result, nil
72+
}

0 commit comments

Comments
 (0)