Skip to content

Commit

Permalink
Tests: Recover-Failed-Webhooks endpoint
Browse files Browse the repository at this point in the history
Add tests to ensure that failed webhooks are retried with expected count based on `since` time specified,
and ensure recover requests are rejected if `since` time exceeds age limit
  • Loading branch information
jaymell committed Apr 5, 2022
1 parent 967d0ff commit 6df7f80
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 68 deletions.
4 changes: 2 additions & 2 deletions server/svix-server/src/v1/endpoints/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ pub struct EndpointSecretOut {
key: EndpointSecret,
}

#[derive(Clone, Debug, PartialEq, Validate, Deserialize)]
#[derive(Clone, Debug, PartialEq, Validate, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RecoverIn {
since: DateTime<Utc>,
pub since: DateTime<Utc>,
}

#[derive(Clone, Debug, PartialEq, Validate, Deserialize)]
Expand Down
123 changes: 64 additions & 59 deletions server/svix-server/tests/e2e_attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ use svix_server::{
mod utils;

use utils::{
common_calls::{create_test_app, create_test_endpoint, create_test_message},
common_calls::{
create_test_app, create_test_endpoint, create_test_message,
get_msg_attempt_list_and_assert_count,
},
get_default_test_config, run_with_retries, start_svix_server, start_svix_server_with_cfg,
TestReceiver,
};

use tokio::time::{sleep, Duration};
use std::time::Duration;

#[tokio::test]
async fn test_list_attempted_messages() {
Expand Down Expand Up @@ -152,12 +155,8 @@ async fn test_list_attempts_by_endpoint() {
receiver_2.jh.abort();
}

async fn try_message_attempts(
endpoint: &str,
msg_status: MessageStatus,
status_code: i16,
attempt_count: Option<usize>,
) {
#[tokio::test]
async fn test_message_attempts() {
let mut cfg = get_default_test_config();
cfg.retry_schedule = (0..2)
.into_iter()
Expand All @@ -166,50 +165,6 @@ async fn try_message_attempts(

let (client, _jh) = start_svix_server_with_cfg(&cfg);

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let endp_id = create_test_endpoint(&client, &app_id, endpoint)
.await
.unwrap()
.id;

let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"}))
.await
.unwrap();

for i in &cfg.retry_schedule {
sleep(*i).await;
}
// Give attempts buffer time to complete:
sleep(Duration::from_millis(50)).await;

let list = run_with_retries(|| async {
let list: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{}/attempt/msg/{}", app_id, &msg.id),
StatusCode::OK,
)
.await
.unwrap();

let attempt_count = attempt_count.unwrap_or(cfg.retry_schedule.len() + 1);
if list.data.len() != attempt_count {
anyhow::bail!("Attempt count does not match retry_schedule length");
}
Ok(list)
})
.await
.unwrap();

for i in list.data.iter() {
assert_eq!(i.status, msg_status);
assert_eq!(i.response_status_code, status_code);
assert_eq!(i.endpoint_id, endp_id);
}
}

#[tokio::test]
async fn test_message_attempts() {
for (status_code, msg_status, attempt_count) in [
// Success
(StatusCode::OK, MessageStatus::Success, Some(1)),
Expand All @@ -218,19 +173,69 @@ async fn test_message_attempts() {
// HTTP 500
(StatusCode::INTERNAL_SERVER_ERROR, MessageStatus::Fail, None),
] {
let app_id = create_test_app(&client, "app").await.unwrap().id;

let receiver = TestReceiver::start(status_code);
try_message_attempts(
&receiver.endpoint,
msg_status,
status_code.as_u16().try_into().unwrap(),
attempt_count,

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data"}))
.await
.unwrap();

let list = get_msg_attempt_list_and_assert_count(
&client,
&app_id,
&msg.id,
attempt_count.unwrap_or(&cfg.retry_schedule.len() + 1),
)
.await;
.await
.unwrap();

for i in list.data.iter() {
assert_eq!(i.status, msg_status);
println!("{} {}", i.response_status_code, status_code);
assert_eq!(
i.response_status_code,
TryInto::<i16>::try_into(status_code.as_u16()).unwrap()
);
assert_eq!(i.endpoint_id, endp_id);
}
receiver.jh.abort();
}

// non-HTTP-related failures:
let app_id = create_test_app(&client, "app").await.unwrap().id;

let receiver = TestReceiver::start(StatusCode::OK);

// stop receiver before beginning tests:
receiver.jh.abort();
try_message_attempts(&receiver.endpoint, MessageStatus::Fail, 0, None).await;

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"}))
.await
.unwrap();

let list = get_msg_attempt_list_and_assert_count(
&client,
&app_id,
&msg.id,
&cfg.retry_schedule.len() + 1,
)
.await
.unwrap();

for i in list.data.iter() {
assert_eq!(i.status, MessageStatus::Fail);
assert_eq!(i.response_status_code, 0);
assert_eq!(i.endpoint_id, endp_id);
}
}
147 changes: 142 additions & 5 deletions server/svix-server/tests/e2e_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
// SPDX-FileCopyrightText: © 2022 Svix Authors
// SPDX-License-Identifier: MIT

use std::time::Duration;

use anyhow::Result;
use chrono::{DateTime, Utc};
use reqwest::StatusCode;

use svix_server::{
core::types::{ApplicationId, EndpointUid},
core::types::{ApplicationId, EndpointId, EndpointUid},
v1::{
endpoints::endpoint::{EndpointIn, EndpointOut, EndpointSecretOut},
endpoints::{
endpoint::{EndpointIn, EndpointOut, EndpointSecretOut, RecoverIn},
message::MessageOut,
},
utils::ListResponse,
},
};

mod utils;

use tokio::time::sleep;
use utils::{
common_calls::{
common_test_list, create_test_app, create_test_endpoint, delete_test_app, endpoint_in,
post_endpoint,
common_test_list, create_test_app, create_test_endpoint, create_test_message,
delete_test_app, endpoint_in, get_msg_attempt_list_and_assert_count, post_endpoint,
recover_webhooks, wait_for_msg_retries,
},
start_svix_server, IgnoredResponse, TestClient,
get_default_test_config, start_svix_server, start_svix_server_with_cfg, IgnoredResponse,
TestClient, TestReceiver,
};

async fn get_endpoint(
Expand Down Expand Up @@ -379,3 +388,131 @@ async fn test_endpoint_secret_get_and_rotation() {
.unwrap()
);
}

#[tokio::test]
async fn test_recovery_should_fail_if_start_time_too_old() {
let (client, _jh) = start_svix_server();

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR);

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let _: serde_json::Value = client
.post(
&format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id),
RecoverIn {
since: Utc::now() - chrono::Duration::weeks(3),
},
StatusCode::UNPROCESSABLE_ENTITY,
)
.await
.unwrap();
}

async fn create_failed_message_attempts() -> (
TestClient,
ApplicationId,
EndpointId,
MessageOut,
MessageOut,
usize,
[DateTime<Utc>; 3],
) {
let mut cfg = get_default_test_config();
cfg.retry_schedule = (0..2)
.into_iter()
.map(|_| Duration::from_millis(1))
.collect();

let (client, _jh) = start_svix_server_with_cfg(&cfg);

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR);

let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let before_msg_1 = Utc::now();

let msg_1 = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"}))
.await
.unwrap();

wait_for_msg_retries(&cfg.retry_schedule).await;

let before_msg_2 = Utc::now();

let msg_2 = create_test_message(&client, &app_id, serde_json::json!({"test": "data2"}))
.await
.unwrap();

wait_for_msg_retries(&cfg.retry_schedule).await;

let after_msg_2 = Utc::now();

receiver.jh.abort();

(
client,
app_id,
endp_id,
msg_1,
msg_2,
cfg.retry_schedule.len() + 1,
[before_msg_1, before_msg_2, after_msg_2],
)
}

#[tokio::test]
async fn test_recovery_expected_retry_counts() {
sleep(Duration::from_millis(50)).await;

for (i, (msg_1_retry_cnt, msg2_retry_cnt)) in [
// expected number of additional retry attempts for (msg1, msg2) if recover `since` is before msg 1:
(1, 1),
// expected values if recover `since` is before msg 2:
(0, 1),
// expected values if recover `since` is after msg 2:
(0, 0),
]
.iter()
.enumerate()
{
let (client, app_id, endp_id, msg_1, msg_2, base_attempt_cnt, times) =
create_failed_message_attempts().await;

recover_webhooks(
&client,
times[i],
&format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id),
)
.await;

sleep(Duration::from_millis(50)).await;

get_msg_attempt_list_and_assert_count(
&client,
&app_id,
&msg_1.id,
base_attempt_cnt + msg_1_retry_cnt,
)
.await
.unwrap();
get_msg_attempt_list_and_assert_count(
&client,
&app_id,
&msg_2.id,
base_attempt_cnt + msg2_retry_cnt,
)
.await
.unwrap();
}
}
Loading

0 comments on commit 6df7f80

Please sign in to comment.