Skip to content

Retrying AlertManager UnaryPath Requests on next replica if one fail #4422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
* [ENHANCEMENT] AlertManager: Retrying AlertManager non-quorum Requests (Get Alertmanager status, Get Alertmanager Receivers, Create/Delete AlertManager Silences) on next replica on error #4422
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
Expand Down
28 changes: 21 additions & 7 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,28 @@ func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Requ
defer sp.Finish()
// Until we have a mechanism to combine the results from multiple alertmanagers,
// we forward the request to only only of the alertmanagers.
amDesc := replicationSet.Instances[rand.Intn(len(replicationSet.Instances))]
resp, err := d.doRequest(ctx, amDesc, req)
if err != nil {
respondFromError(err, w, logger)
return
}
instances := replicationSet.Instances

// Randomize the list of instances to not always query the same one.
rand.Shuffle(len(instances), func(i, j int) {
instances[i], instances[j] = instances[j], instances[i]
})

for i, instance := range instances {
resp, err := d.doRequest(ctx, instance, req)

respondFromHTTPGRPCResponse(w, resp)
// Only return error if there is no more instances to try
if err != nil && i == len(instances)-1 {
respondFromError(err, w, logger)
return
}

// Return on the first succeeded request
if err == nil {
respondFromHTTPGRPCResponse(w, resp)
return
}
}
}

func respondFromError(err error, w http.ResponseWriter, logger log.Logger) {
Expand Down
44 changes: 37 additions & 7 deletions pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
Expand All @@ -35,6 +37,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
name string
numAM, numHappyAM int
replicationFactor int
failedReqCount int
isRead bool
isDelete bool
expStatusCode int
Expand Down Expand Up @@ -194,6 +197,27 @@ func TestDistributor_DistributeRequest(t *testing.T) {
expStatusCode: http.StatusOK,
expectedTotalCalls: 1,
route: "/status",
},
{
name: "Read /status should try all alert managers on error",
numAM: 3,
numHappyAM: 0,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusInternalServerError,
expectedTotalCalls: 3,
route: "/status",
},
{
name: "Read /status is sent to 3 AM when 2 are not happy",
numAM: 3,
numHappyAM: 3,
failedReqCount: 2,
replicationFactor: 3,
isRead: true,
expStatusCode: http.StatusOK,
expectedTotalCalls: 3,
route: "/status",
}, {
name: "Write /status not supported",
numAM: 5,
Expand Down Expand Up @@ -227,7 +251,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
route := "/alertmanager/api/v1" + c.route
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor, c.responseBody)
d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.failedReqCount, c.replicationFactor, c.responseBody)
t.Cleanup(cleanup)

ctx := user.InjectOrgID(context.Background(), "1")
Expand Down Expand Up @@ -304,20 +328,22 @@ func TestDistributor_IsPathSupported(t *testing.T) {

for path, isSupported := range supported {
t.Run(path, func(t *testing.T) {
d, _, cleanup := prepare(t, 1, 1, 1, []byte{})
d, _, cleanup := prepare(t, 1, 1, 0, 1, []byte{})
t.Cleanup(cleanup)
require.Equal(t, isSupported, d.IsPathSupported(path))
})
}
}

func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
func prepare(t *testing.T, numAM, numHappyAM, totalFailedReq, replicationFactor int, responseBody []byte) (*Distributor, []*mockAlertmanager, func()) {
ams := []*mockAlertmanager{}
failedReqCount := atomic.NewInt32(int32(totalFailedReq))

for i := 0; i < numHappyAM; i++ {
ams = append(ams, newMockAlertmanager(i, true, responseBody))
ams = append(ams, newMockAlertmanager(i, true, failedReqCount, responseBody))
}
for i := numHappyAM; i < numAM; i++ {
ams = append(ams, newMockAlertmanager(i, false, responseBody))
ams = append(ams, newMockAlertmanager(i, false, failedReqCount, responseBody))
}

// Use a real ring with a mock KV store to test ring RF logic.
Expand Down Expand Up @@ -379,14 +405,16 @@ type mockAlertmanager struct {
myAddr string
happy bool
responseBody []byte
failedReqCount *atomic.Int32
}

func newMockAlertmanager(idx int, happy bool, responseBody []byte) *mockAlertmanager {
func newMockAlertmanager(idx int, happy bool, failedReqCount *atomic.Int32, responseBody []byte) *mockAlertmanager {
return &mockAlertmanager{
receivedRequests: make(map[string]map[int]int),
myAddr: fmt.Sprintf("127.0.0.1:%05d", 10000+idx),
happy: happy,
responseBody: responseBody,
failedReqCount: failedReqCount,
}
}

Expand All @@ -405,7 +433,9 @@ func (am *mockAlertmanager) HandleRequest(_ context.Context, in *httpgrpc.HTTPRe
am.receivedRequests[path] = m
}

if am.happy {
failedCountLocal := am.failedReqCount.Dec()

if am.happy && failedCountLocal < 0 {
m[http.StatusOK]++
return &httpgrpc.HTTPResponse{
Code: http.StatusOK,
Expand Down