Skip to content

Add minimal support for missing routes in alertmanager distributor. #4084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 143 additions & 18 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,37 +311,162 @@ func TestAlertmanagerSharding(t *testing.T) {
userID := "user-5"

// 2. Let's create a silence
silence := types.Silence{
Matchers: amlabels.Matchers{
{Name: "instance", Value: "prometheus-one"},
},
Comment: "Created for a test case.",
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
comment := func(i int) string {
return fmt.Sprintf("Silence Comment #%d", i)
}
silence := func(i int) types.Silence {
return types.Silence{
Matchers: amlabels.Matchers{
{Name: "instance", Value: "prometheus-one"},
},
Comment: comment(i),
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
}
}

// 2b. For each tenant, with a replication factor of 2 and 3 instances there's a chance the user might not be in one of the replicas.
// Therefore, try to create a silence on every instance and expect two silences to exist.
// 2b. For each tenant, with a replication factor of 2 and 3 instances,
// the user will not be present in one of the instances.
// However, the distributor should route us to a correct instance.
c1, err := e2ecortex.NewClient("", "", alertmanager1.HTTPEndpoint(), "", userID)
require.NoError(t, err)
c2, err := e2ecortex.NewClient("", "", alertmanager2.HTTPEndpoint(), "", userID)
require.NoError(t, err)
c3, err := e2ecortex.NewClient("", "", alertmanager3.HTTPEndpoint(), "", userID)
require.NoError(t, err)

errs := []error{}
if err := c1.CreateSilence(context.Background(), silence); err != nil {
errs = append(errs, err)
clients := []*e2ecortex.Client{c1, c2, c3}

waitForSilences := func(state string, amount int) error {
return alertmanagers.WaitSumMetricsWithOptions(
e2e.Equals(float64(amount)),
[]string{"cortex_alertmanager_silences"},
e2e.WaitMissingMetrics,
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "state", state),
),
)
}
if err := c2.CreateSilence(context.Background(), silence); err != nil {
errs = append(errs, err)

var id1, id2, id3 string

// Endpoint: POST /silences
{
id1, err = c1.CreateSilence(context.Background(), silence(1))
assert.NoError(t, err)
id2, err = c2.CreateSilence(context.Background(), silence(2))
assert.NoError(t, err)
id3, err = c3.CreateSilence(context.Background(), silence(3))
assert.NoError(t, err)

// Reading silences do not currently read from all replicas. We have to wait for
// the silence to be replicated asynchronously, before we can reliably read them.
require.NoError(t, waitForSilences("active", 6))
}
if err := c3.CreateSilence(context.Background(), silence); err != nil {
errs = append(errs, err)

assertSilences := func(list []types.Silence, s1, s2, s3 types.SilenceState) {
assert.Equal(t, 3, len(list))

ids := make(map[string]types.Silence, len(list))
for _, s := range list {
ids[s.ID] = s
}

require.Contains(t, ids, id1)
assert.Equal(t, comment(1), ids[id1].Comment)
assert.Equal(t, s1, ids[id1].Status.State)
require.Contains(t, ids, id2)
assert.Equal(t, comment(2), ids[id2].Comment)
assert.Equal(t, s2, ids[id2].Status.State)
require.Contains(t, ids, id3)
assert.Equal(t, comment(3), ids[id3].Comment)
assert.Equal(t, s3, ids[id3].Status.State)
}

// Endpoint: GET /silences
{
for _, c := range clients {
list, err := c.GetSilences(context.Background())
require.NoError(t, err)
assertSilences(list, types.SilenceStateActive, types.SilenceStateActive, types.SilenceStateActive)
}
}

// Endpoint: GET /silence/{id}
{
for _, c := range clients {
sil1, err := c.GetSilence(context.Background(), id1)
require.NoError(t, err)
assert.Equal(t, comment(1), sil1.Comment)
assert.Equal(t, types.SilenceStateActive, sil1.Status.State)

sil2, err := c.GetSilence(context.Background(), id2)
require.NoError(t, err)
assert.Equal(t, comment(2), sil2.Comment)
assert.Equal(t, types.SilenceStateActive, sil2.Status.State)

sil3, err := c.GetSilence(context.Background(), id3)
require.NoError(t, err)
assert.Equal(t, comment(3), sil3.Comment)
assert.Equal(t, types.SilenceStateActive, sil3.Status.State)
}
}
assert.Equal(t, 1, len(errs), "expected exactly one client to error, got:\n %v", errs)

assert.NoError(t, alertmanagers.WaitSumMetricsWithOptions(e2e.Equals(float64(4)), []string{"cortex_alertmanager_silences"}), e2e.WaitMissingMetrics)
// Endpoint: GET /receivers
{
for _, c := range clients {
list, err := c.GetReceivers(context.Background())
assert.NoError(t, err)
assert.ElementsMatch(t, list, []string{"dummy"})
}
}

// Endpoint: GET /status
{
for _, c := range clients {
_, err := c.GetAlertmanagerConfig(context.Background())
assert.NoError(t, err)
}
}

// Endpoint: DELETE /silence/{id}
{
// Delete one silence via each instance. Listing the silences on
// all other instances should yield the silence being expired.
err = c1.DeleteSilence(context.Background(), id2)
assert.NoError(t, err)

// These waits are required as deletion replication is currently
// asynchronous, and silence reading is not consistent. Once
// merging is implemented on the read path, this is not needed.
require.NoError(t, waitForSilences("expired", 2))

for _, c := range clients {
list, err := c.GetSilences(context.Background())
require.NoError(t, err)
assertSilences(list, types.SilenceStateActive, types.SilenceStateExpired, types.SilenceStateActive)
}

err = c2.DeleteSilence(context.Background(), id3)
assert.NoError(t, err)
require.NoError(t, waitForSilences("expired", 4))

for _, c := range clients {
list, err := c.GetSilences(context.Background())
require.NoError(t, err)
assertSilences(list, types.SilenceStateActive, types.SilenceStateExpired, types.SilenceStateExpired)
}

err = c3.DeleteSilence(context.Background(), id1)
assert.NoError(t, err)
require.NoError(t, waitForSilences("expired", 6))

for _, c := range clients {
list, err := c.GetSilences(context.Background())
require.NoError(t, err)
assertSilences(list, types.SilenceStateExpired, types.SilenceStateExpired, types.SilenceStateExpired)
}
}
})
}
}
161 changes: 158 additions & 3 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,128 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
return nil
}

func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) error {
// CreateSilence creates a new silence and returns the unique identifier of the silence.
func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) (string, error) {
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)

data, err := json.Marshal(silence)
if err != nil {
return fmt.Errorf("error marshaling the silence: %s", err)
return "", fmt.Errorf("error marshaling the silence: %s", err)
}

req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(data))
if err != nil {
return "", fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return "", err
}

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("creating the silence failed with status %d and error %v", resp.StatusCode, string(body))
}

type response struct {
Status string `json:"status"`
Data struct {
SilenceID string `json:"silenceID"`
} `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return "", err
}

if decoded.Status != "success" {
return "", fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.SilenceID, nil
}

func (c *Client) GetSilences(ctx context.Context) ([]types.Silence, error) {
u := c.alertmanagerClient.URL("api/prom/api/v1/silences", nil)

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("getting silences failed with status %d and error %v", resp.StatusCode, string(body))
}

type response struct {
Status string `json:"status"`
Data []types.Silence `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data, nil
}

func (c *Client) GetSilence(ctx context.Context, id string) (types.Silence, error) {
u := c.alertmanagerClient.URL(fmt.Sprintf("api/prom/api/v1/silence/%s", url.PathEscape(id)), nil)

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return types.Silence{}, fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return types.Silence{}, err
}

if resp.StatusCode == http.StatusNotFound {
return types.Silence{}, ErrNotFound
}

if resp.StatusCode/100 != 2 {
return types.Silence{}, fmt.Errorf("getting silence failed with status %d and error %v", resp.StatusCode, string(body))
}

type response struct {
Status string `json:"status"`
Data types.Silence `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return types.Silence{}, err
}

if decoded.Status != "success" {
return types.Silence{}, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data, nil
}

func (c *Client) DeleteSilence(ctx context.Context, id string) error {
u := c.alertmanagerClient.URL(fmt.Sprintf("api/prom/api/v1/silence/%s", url.PathEscape(id)), nil)

req, err := http.NewRequest(http.MethodDelete, u.String(), nil)
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}
Expand All @@ -527,13 +640,55 @@ func (c *Client) CreateSilence(ctx context.Context, silence types.Silence) error
return err
}

if resp.StatusCode == http.StatusNotFound {
return ErrNotFound
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("creating the silence failed with status %d and error %v", resp.StatusCode, string(body))
return fmt.Errorf("deleting silence failed with status %d and error %v", resp.StatusCode, string(body))
}

return nil
}

func (c *Client) GetReceivers(ctx context.Context) ([]string, error) {
u := c.alertmanagerClient.URL("api/prom/api/v1/receivers", nil)

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}

resp, body, err := c.alertmanagerClient.Do(ctx, req)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("getting receivers failed with status %d and error %v", resp.StatusCode, string(body))
}

type response struct {
Status string `json:"status"`
Data []string `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data, nil
}

func (c *Client) PostRequest(url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
Expand Down
Loading