Skip to content

Commit

Permalink
Tests: Recover-Failed-Webhooks endpoint
Browse files Browse the repository at this point in the history
Add tests to ensure that failed webhooks are retried with expected count based on `since` time specified
Ensure recover requests are rejected if `since` time exceeds age limit
  • Loading branch information
jaymell committed Apr 5, 2022
1 parent 967d0ff commit e12b51a
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 70 deletions.
4 changes: 2 additions & 2 deletions server/svix-server/src/v1/endpoints/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ pub struct EndpointSecretOut {
key: EndpointSecret,
}

#[derive(Clone, Debug, PartialEq, Validate, Deserialize)]
#[derive(Clone, Debug, PartialEq, Validate, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RecoverIn {
since: DateTime<Utc>,
pub since: DateTime<Utc>,
}

#[derive(Clone, Debug, PartialEq, Validate, Deserialize)]
Expand Down
65 changes: 4 additions & 61 deletions server/svix-server/tests/e2e_attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use svix_server::{
mod utils;

use utils::{
common_calls::{create_test_app, create_test_endpoint, create_test_message},
get_default_test_config, run_with_retries, start_svix_server, start_svix_server_with_cfg,
TestReceiver,
common_calls::{
create_test_app, create_test_endpoint, create_test_message, try_message_attempts,
},
run_with_retries, start_svix_server, TestReceiver,
};

use tokio::time::{sleep, Duration};

#[tokio::test]
async fn test_list_attempted_messages() {
let (client, _jh) = start_svix_server();
Expand Down Expand Up @@ -152,62 +151,6 @@ async fn test_list_attempts_by_endpoint() {
receiver_2.jh.abort();
}

async fn try_message_attempts(
endpoint: &str,
msg_status: MessageStatus,
status_code: i16,
attempt_count: Option<usize>,
) {
let mut cfg = get_default_test_config();
cfg.retry_schedule = (0..2)
.into_iter()
.map(|_| Duration::from_millis(1))
.collect();

let (client, _jh) = start_svix_server_with_cfg(&cfg);

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let endp_id = create_test_endpoint(&client, &app_id, endpoint)
.await
.unwrap()
.id;

let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"}))
.await
.unwrap();

for i in &cfg.retry_schedule {
sleep(*i).await;
}
// Give attempts buffer time to complete:
sleep(Duration::from_millis(50)).await;

let list = run_with_retries(|| async {
let list: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{}/attempt/msg/{}", app_id, &msg.id),
StatusCode::OK,
)
.await
.unwrap();

let attempt_count = attempt_count.unwrap_or(cfg.retry_schedule.len() + 1);
if list.data.len() != attempt_count {
anyhow::bail!("Attempt count does not match retry_schedule length");
}
Ok(list)
})
.await
.unwrap();

for i in list.data.iter() {
assert_eq!(i.status, msg_status);
assert_eq!(i.response_status_code, status_code);
assert_eq!(i.endpoint_id, endp_id);
}
}

#[tokio::test]
async fn test_message_attempts() {
for (status_code, msg_status, attempt_count) in [
Expand Down
153 changes: 149 additions & 4 deletions server/svix-server/tests/e2e_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
// SPDX-FileCopyrightText: © 2022 Svix Authors
// SPDX-License-Identifier: MIT

use std::time::Duration;

use anyhow::Result;
use chrono::{DateTime, Utc};
use reqwest::StatusCode;

use svix_server::{
core::types::{ApplicationId, EndpointUid},
core::types::{ApplicationId, EndpointId, EndpointUid, MessageStatus},
v1::{
endpoints::endpoint::{EndpointIn, EndpointOut, EndpointSecretOut},
endpoints::{
endpoint::{EndpointIn, EndpointOut, EndpointSecretOut, RecoverIn},
message::MessageOut,
},
utils::ListResponse,
},
};

mod utils;

use tokio::time::sleep;
use utils::{
common_calls::{
common_test_list, create_test_app, create_test_endpoint, delete_test_app, endpoint_in,
post_endpoint,
get_and_assert_msg_attempt_list, post_endpoint, recover_webhooks,
try_message_attempts_with_params,
},
start_svix_server, IgnoredResponse, TestClient,
get_default_test_config, start_svix_server, start_svix_server_with_cfg, IgnoredResponse,
TestClient, TestReceiver,
};

async fn get_endpoint(
Expand Down Expand Up @@ -379,3 +388,139 @@ async fn test_endpoint_secret_get_and_rotation() {
.unwrap()
);
}

async fn create_failed_message_attempts() -> (
TestClient,
ApplicationId,
EndpointId,
MessageOut,
MessageOut,
usize,
[DateTime<Utc>; 3],
) {
let mut cfg = get_default_test_config();
cfg.retry_schedule = (0..2)
.into_iter()
.map(|_| Duration::from_millis(1))
.collect();

let (client, _jh) = start_svix_server_with_cfg(&cfg);

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR);

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let before_msg_1 = Utc::now();

let msg_1 = try_message_attempts_with_params(
&app_id,
&endp_id,
&client,
&cfg.retry_schedule,
MessageStatus::Fail,
500,
None,
)
.await;

let before_msg_2 = Utc::now();

let msg_2 = try_message_attempts_with_params(
&app_id,
&endp_id,
&client,
&cfg.retry_schedule,
MessageStatus::Fail,
500,
None,
)
.await;

let after_msg_2 = Utc::now();

receiver.jh.abort();

(
client,
app_id,
endp_id,
msg_1,
msg_2,
cfg.retry_schedule.len() + 1,
[before_msg_1, before_msg_2, after_msg_2],
)
}

#[tokio::test]
async fn test_recovery_should_fail_if_start_time_too_old() {
let (client, _jh) = start_svix_server();

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR);

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let _: serde_json::Value = client
.post(
&format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id),
RecoverIn {
since: Utc::now() - chrono::Duration::weeks(3),
},
StatusCode::UNPROCESSABLE_ENTITY,
)
.await
.unwrap();
}

#[tokio::test]
async fn test_recovery_expected_retry_counts() {
sleep(Duration::from_millis(50)).await;

for (i, (msg_1_retry_cnt, msg2_retry_cnt)) in [
// expected number of additional retry attempts for (msg1, msg2) if recover `since` is before msg 1:
(1, 1),
// expected values if recover `since` is before msg 2:
(0, 1),
// expected values if recover `since` is after msg 2:
(0, 0),
]
.iter()
.enumerate()
{
let (client, app_id, endp_id, msg_1, msg_2, base_attempt_cnt, times) =
create_failed_message_attempts().await;

recover_webhooks(
&client,
times[i],
&format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id),
)
.await;

sleep(Duration::from_millis(50)).await;

get_and_assert_msg_attempt_list(
&client,
&app_id,
&msg_1.id,
base_attempt_cnt + msg_1_retry_cnt,
)
.await;
get_and_assert_msg_attempt_list(
&client,
&app_id,
&msg_2.id,
base_attempt_cnt + msg2_retry_cnt,
)
.await;
}
}
Loading

0 comments on commit e12b51a

Please sign in to comment.