Skip to content

Commit cda99eb

Browse files
committed
fix: check alert threshold larger or equal (#597)
* test: e2e suite testing destination disable * fix: check alert threshold larger or equal
1 parent 859b1de commit cda99eb

File tree

3 files changed

+229
-2
lines changed

3 files changed

+229
-2
lines changed

cmd/e2e/suites_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,150 @@ func TestBasicSuiteWithDeploymentID(t *testing.T) {
223223
deploymentID: "dp_e2e_test",
224224
})
225225
}
226+
227+
// TestAutoDisableWithoutCallbackURL tests the scenario from issue #596:
228+
// ALERT_AUTO_DISABLE_DESTINATION=true without ALERT_CALLBACK_URL set.
229+
// Run with: go test -v -run TestAutoDisableWithoutCallbackURL ./cmd/e2e/...
230+
func TestAutoDisableWithoutCallbackURL(t *testing.T) {
231+
t.Parallel()
232+
if testing.Short() {
233+
t.Skip("skipping e2e test")
234+
}
235+
236+
// Setup infrastructure
237+
testinfraCleanup := testinfra.Start(t)
238+
defer testinfraCleanup()
239+
gin.SetMode(gin.TestMode)
240+
mockServerBaseURL := testinfra.GetMockServer(t)
241+
242+
// Configure WITHOUT alert callback URL (the issue #596 scenario)
243+
cfg := configs.Basic(t, configs.BasicOpts{
244+
LogStorage: configs.LogStorageTypePostgres,
245+
})
246+
cfg.Alert.CallbackURL = "" // No callback URL
247+
cfg.Alert.AutoDisableDestination = true // Auto-disable enabled
248+
cfg.Alert.ConsecutiveFailureCount = 20 // Default threshold
249+
250+
require.NoError(t, cfg.Validate(config.Flags{}))
251+
252+
// Start application
253+
ctx, cancel := context.WithCancel(context.Background())
254+
defer cancel()
255+
256+
appDone := make(chan struct{})
257+
go func() {
258+
defer close(appDone)
259+
application := app.New(&cfg)
260+
if err := application.Run(ctx); err != nil {
261+
log.Println("Application stopped:", err)
262+
}
263+
}()
264+
defer func() {
265+
cancel()
266+
<-appDone
267+
}()
268+
269+
// Wait for services to start
270+
time.Sleep(2 * time.Second)
271+
272+
// Setup test client
273+
client := httpclient.New(fmt.Sprintf("http://localhost:%d/api/v1", cfg.APIPort), cfg.APIKey)
274+
mockServerInfra := testinfra.NewMockServerInfra(mockServerBaseURL)
275+
276+
// Test data
277+
tenantID := fmt.Sprintf("tenant_%d", time.Now().UnixNano())
278+
destinationID := fmt.Sprintf("dest_%d", time.Now().UnixNano())
279+
secret := "testsecret1234567890abcdefghijklmnop"
280+
281+
// Create tenant
282+
resp, err := client.Do(httpclient.Request{
283+
Method: httpclient.MethodPUT,
284+
Path: "/" + tenantID,
285+
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
286+
})
287+
require.NoError(t, err)
288+
require.Equal(t, 201, resp.StatusCode, "failed to create tenant")
289+
290+
// Configure mock server destination to return errors
291+
resp, err = client.Do(httpclient.Request{
292+
Method: httpclient.MethodPUT,
293+
BaseURL: mockServerBaseURL,
294+
Path: "/destinations",
295+
Body: map[string]interface{}{
296+
"id": destinationID,
297+
"type": "webhook",
298+
"config": map[string]interface{}{
299+
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
300+
},
301+
"credentials": map[string]interface{}{
302+
"secret": secret,
303+
},
304+
},
305+
})
306+
require.NoError(t, err)
307+
require.Equal(t, 200, resp.StatusCode, "failed to configure mock server")
308+
309+
// Create destination
310+
resp, err = client.Do(httpclient.Request{
311+
Method: httpclient.MethodPOST,
312+
Path: "/" + tenantID + "/destinations",
313+
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
314+
Body: map[string]interface{}{
315+
"id": destinationID,
316+
"type": "webhook",
317+
"topics": "*",
318+
"config": map[string]interface{}{
319+
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
320+
},
321+
"credentials": map[string]interface{}{
322+
"secret": secret,
323+
},
324+
},
325+
})
326+
require.NoError(t, err)
327+
require.Equal(t, 201, resp.StatusCode, "failed to create destination")
328+
329+
// Publish 21 events that will fail (1 more than threshold to test idempotency)
330+
for i := 0; i < 21; i++ {
331+
resp, err = client.Do(httpclient.Request{
332+
Method: httpclient.MethodPOST,
333+
Path: "/publish",
334+
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
335+
Body: map[string]interface{}{
336+
"tenant_id": tenantID,
337+
"topic": "user.created",
338+
"eligible_for_retry": false,
339+
"metadata": map[string]any{
340+
"should_err": "true",
341+
},
342+
"data": map[string]any{
343+
"index": i,
344+
},
345+
},
346+
})
347+
require.NoError(t, err)
348+
require.Equal(t, 202, resp.StatusCode, "failed to publish event %d", i)
349+
}
350+
351+
// Wait for deliveries to be processed
352+
time.Sleep(time.Second)
353+
354+
// Check if destination is disabled
355+
resp, err = client.Do(httpclient.Request{
356+
Method: httpclient.MethodGET,
357+
Path: "/" + tenantID + "/destinations/" + destinationID,
358+
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
359+
})
360+
require.NoError(t, err)
361+
require.Equal(t, 200, resp.StatusCode, "failed to get destination")
362+
363+
// Parse response to check disabled_at
364+
bodyMap, ok := resp.Body.(map[string]interface{})
365+
require.True(t, ok, "response body should be a map")
366+
367+
disabledAt := bodyMap["disabled_at"]
368+
assert.NotNil(t, disabledAt, "destination should be disabled (disabled_at should not be null) - issue #596")
369+
370+
// Cleanup mock server
371+
_ = mockServerInfra
372+
}

internal/alert/evaluator.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,21 @@ func (e *alertEvaluator) ShouldAlert(failures int) (int, bool) {
6969
}
7070

7171
// Get current alert level
72+
// Iterate from highest to lowest threshold
7273
for i := len(e.thresholds) - 1; i >= 0; i-- {
73-
if failures == e.thresholds[i].failures {
74-
return e.thresholds[i].percentage, true
74+
threshold := e.thresholds[i]
75+
76+
// For the 100% threshold (auto-disable), use >= to ensure we don't miss it
77+
// if concurrent processing causes us to skip over the exact count.
78+
// For other thresholds, use exact match to avoid duplicate alerts.
79+
if threshold.percentage == 100 {
80+
if failures >= threshold.failures {
81+
return threshold.percentage, true
82+
}
83+
} else {
84+
if failures == threshold.failures {
85+
return threshold.percentage, true
86+
}
7587
}
7688
}
7789

internal/alert/monitor_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,71 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) {
168168
// Verify the destination was never disabled
169169
disabler.AssertNotCalled(t, "DisableDestination")
170170
}
171+
172+
func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) {
173+
// Tests that failures above the 100% threshold still trigger disable.
174+
// This ensures we don't miss the disable if concurrent processing
175+
// causes us to skip over the exact threshold count.
176+
t.Parallel()
177+
ctx := context.Background()
178+
logger := testutil.CreateTestLogger(t)
179+
redisClient := testutil.CreateTestRedisClient(t)
180+
notifier := &mockAlertNotifier{}
181+
notifier.On("Notify", mock.Anything, mock.Anything).Return(nil)
182+
disabler := &mockDestinationDisabler{}
183+
disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil)
184+
185+
monitor := alert.NewAlertMonitor(
186+
logger,
187+
redisClient,
188+
alert.WithNotifier(notifier),
189+
alert.WithDisabler(disabler),
190+
alert.WithAutoDisableFailureCount(20),
191+
alert.WithAlertThresholds([]int{50, 70, 90, 100}),
192+
)
193+
194+
dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"}
195+
event := &models.Event{Topic: "test.event"}
196+
deliveryEvent := &models.DeliveryEvent{Event: *event}
197+
attempt := alert.DeliveryAttempt{
198+
Success: false,
199+
DeliveryEvent: deliveryEvent,
200+
Destination: dest,
201+
DeliveryResponse: map[string]interface{}{
202+
"status": "500",
203+
},
204+
Timestamp: time.Now(),
205+
}
206+
207+
// Send 25 consecutive failures (5 more than the threshold)
208+
for i := 1; i <= 25; i++ {
209+
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
210+
}
211+
212+
// Verify notifications at 50%, 70%, 90%, and 100% thresholds
213+
// Plus additional notifications for failures 21-25 (all at 100% level)
214+
var notifyCallCount int
215+
var disableNotifyCount int
216+
for _, call := range notifier.Calls {
217+
if call.Method == "Notify" {
218+
notifyCallCount++
219+
alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert)
220+
if alertData.Data.ConsecutiveFailures >= 20 {
221+
disableNotifyCount++
222+
require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%")
223+
}
224+
}
225+
}
226+
// 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25
227+
require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)")
228+
require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)")
229+
230+
// Verify destination was disabled multiple times (once per failure >= 20)
231+
var disableCallCount int
232+
for _, call := range disabler.Calls {
233+
if call.Method == "DisableDestination" {
234+
disableCallCount++
235+
}
236+
}
237+
require.Equal(t, 6, disableCallCount, "Should have called disable 6 times (for failures 20-25)")
238+
}

0 commit comments

Comments
 (0)