From c0643a9c76c2e08aace7d031803ccf5ac1526be3 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Tue, 5 Apr 2022 15:06:18 -0500 Subject: [PATCH] Tests: Recover-Failed-Webhooks endpoint (#365) Add tests to ensure that failed webhooks are retried with expected count based on `since` time specified, and ensure recover requests are rejected if `since` time exceeds age limit --- .../src/v1/endpoints/endpoint/mod.rs | 4 +- server/svix-server/tests/e2e_attempt.rs | 123 ++++++++------- server/svix-server/tests/e2e_endpoint.rs | 147 +++++++++++++++++- .../svix-server/tests/utils/common_calls.rs | 48 +++++- 4 files changed, 254 insertions(+), 68 deletions(-) diff --git a/server/svix-server/src/v1/endpoints/endpoint/mod.rs b/server/svix-server/src/v1/endpoints/endpoint/mod.rs index 738b3632c..92e654fb1 100644 --- a/server/svix-server/src/v1/endpoints/endpoint/mod.rs +++ b/server/svix-server/src/v1/endpoints/endpoint/mod.rs @@ -157,10 +157,10 @@ pub struct EndpointSecretOut { key: EndpointSecret, } -#[derive(Clone, Debug, PartialEq, Validate, Deserialize)] +#[derive(Clone, Debug, PartialEq, Validate, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RecoverIn { - since: DateTime, + pub since: DateTime, } #[derive(Clone, Debug, PartialEq, Validate, Deserialize)] diff --git a/server/svix-server/tests/e2e_attempt.rs b/server/svix-server/tests/e2e_attempt.rs index 983799761..88aeab153 100644 --- a/server/svix-server/tests/e2e_attempt.rs +++ b/server/svix-server/tests/e2e_attempt.rs @@ -14,12 +14,15 @@ use svix_server::{ mod utils; use utils::{ - common_calls::{create_test_app, create_test_endpoint, create_test_message}, + common_calls::{ + create_test_app, create_test_endpoint, create_test_message, + get_msg_attempt_list_and_assert_count, + }, get_default_test_config, run_with_retries, start_svix_server, start_svix_server_with_cfg, TestReceiver, }; -use tokio::time::{sleep, Duration}; +use std::time::Duration; #[tokio::test] async fn test_list_attempted_messages() { @@ -152,12 +155,8 @@ async fn test_list_attempts_by_endpoint() { receiver_2.jh.abort(); } -async fn try_message_attempts( - endpoint: &str, - msg_status: MessageStatus, - status_code: i16, - attempt_count: Option, -) { +#[tokio::test] +async fn test_message_attempts() { let mut cfg = get_default_test_config(); cfg.retry_schedule = (0..2) .into_iter() @@ -166,50 +165,6 @@ async fn try_message_attempts( let (client, _jh) = start_svix_server_with_cfg(&cfg); - let app_id = create_test_app(&client, "app1").await.unwrap().id; - - let endp_id = create_test_endpoint(&client, &app_id, endpoint) - .await - .unwrap() - .id; - - let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"})) - .await - .unwrap(); - - for i in &cfg.retry_schedule { - sleep(*i).await; - } - // Give attempts buffer time to complete: - sleep(Duration::from_millis(50)).await; - - let list = run_with_retries(|| async { - let list: ListResponse = client - .get( - &format!("api/v1/app/{}/attempt/msg/{}", app_id, &msg.id), - StatusCode::OK, - ) - .await - .unwrap(); - - let attempt_count = attempt_count.unwrap_or(cfg.retry_schedule.len() + 1); - if list.data.len() != attempt_count { - anyhow::bail!("Attempt count does not match retry_schedule length"); - } - Ok(list) - }) - .await - .unwrap(); - - for i in list.data.iter() { - assert_eq!(i.status, msg_status); - assert_eq!(i.response_status_code, status_code); - assert_eq!(i.endpoint_id, endp_id); - } -} - -#[tokio::test] -async fn test_message_attempts() { for (status_code, msg_status, attempt_count) in [ // Success (StatusCode::OK, MessageStatus::Success, Some(1)), @@ -218,21 +173,71 @@ async fn test_message_attempts() { // HTTP 500 (StatusCode::INTERNAL_SERVER_ERROR, MessageStatus::Fail, None), ] { + let app_id = create_test_app(&client, "app").await.unwrap().id; + let receiver = TestReceiver::start(status_code); - try_message_attempts( - &receiver.endpoint, - msg_status, - status_code.as_u16().try_into().unwrap(), - attempt_count, + + let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint) + .await + .unwrap() + .id; + + let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data"})) + .await + .unwrap(); + + let list = get_msg_attempt_list_and_assert_count( + &client, + &app_id, + &msg.id, + attempt_count.unwrap_or(&cfg.retry_schedule.len() + 1), ) - .await; + .await + .unwrap(); + + for i in list.data.iter() { + assert_eq!(i.status, msg_status); + println!("{} {}", i.response_status_code, status_code); + assert_eq!( + i.response_status_code, + TryInto::::try_into(status_code.as_u16()).unwrap() + ); + assert_eq!(i.endpoint_id, endp_id); + } receiver.jh.abort(); } // non-HTTP-related failures: + let app_id = create_test_app(&client, "app").await.unwrap().id; + let receiver = TestReceiver::start(StatusCode::OK); + + // stop receiver before beginning tests: receiver.jh.abort(); - try_message_attempts(&receiver.endpoint, MessageStatus::Fail, 0, None).await; + + let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint) + .await + .unwrap() + .id; + + let msg = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"})) + .await + .unwrap(); + + let list = get_msg_attempt_list_and_assert_count( + &client, + &app_id, + &msg.id, + &cfg.retry_schedule.len() + 1, + ) + .await + .unwrap(); + + for i in list.data.iter() { + assert_eq!(i.status, MessageStatus::Fail); + assert_eq!(i.response_status_code, 0); + assert_eq!(i.endpoint_id, endp_id); + } } #[tokio::test] diff --git a/server/svix-server/tests/e2e_endpoint.rs b/server/svix-server/tests/e2e_endpoint.rs index aa1d1e774..d6dcc25e5 100644 --- a/server/svix-server/tests/e2e_endpoint.rs +++ b/server/svix-server/tests/e2e_endpoint.rs @@ -1,25 +1,34 @@ // SPDX-FileCopyrightText: © 2022 Svix Authors // SPDX-License-Identifier: MIT +use std::time::Duration; + use anyhow::Result; +use chrono::{DateTime, Utc}; use reqwest::StatusCode; use svix_server::{ - core::types::{ApplicationId, EndpointUid}, + core::types::{ApplicationId, EndpointId, EndpointUid}, v1::{ - endpoints::endpoint::{EndpointIn, EndpointOut, EndpointSecretOut}, + endpoints::{ + endpoint::{EndpointIn, EndpointOut, EndpointSecretOut, RecoverIn}, + message::MessageOut, + }, utils::ListResponse, }, }; mod utils; +use tokio::time::sleep; use utils::{ common_calls::{ - common_test_list, create_test_app, create_test_endpoint, delete_test_app, endpoint_in, - post_endpoint, + common_test_list, create_test_app, create_test_endpoint, create_test_message, + delete_test_app, endpoint_in, get_msg_attempt_list_and_assert_count, post_endpoint, + recover_webhooks, wait_for_msg_retries, }, - start_svix_server, IgnoredResponse, TestClient, + get_default_test_config, start_svix_server, start_svix_server_with_cfg, IgnoredResponse, + TestClient, TestReceiver, }; async fn get_endpoint( @@ -379,3 +388,131 @@ async fn test_endpoint_secret_get_and_rotation() { .unwrap() ); } + +#[tokio::test] +async fn test_recovery_should_fail_if_start_time_too_old() { + let (client, _jh) = start_svix_server(); + + let app_id = create_test_app(&client, "app1").await.unwrap().id; + + let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR); + + let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint) + .await + .unwrap() + .id; + + let _: serde_json::Value = client + .post( + &format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id), + RecoverIn { + since: Utc::now() - chrono::Duration::weeks(3), + }, + StatusCode::UNPROCESSABLE_ENTITY, + ) + .await + .unwrap(); +} + +async fn create_failed_message_attempts() -> ( + TestClient, + ApplicationId, + EndpointId, + MessageOut, + MessageOut, + usize, + [DateTime; 3], +) { + let mut cfg = get_default_test_config(); + cfg.retry_schedule = (0..2) + .into_iter() + .map(|_| Duration::from_millis(1)) + .collect(); + + let (client, _jh) = start_svix_server_with_cfg(&cfg); + + let app_id = create_test_app(&client, "app1").await.unwrap().id; + + let receiver = TestReceiver::start(StatusCode::INTERNAL_SERVER_ERROR); + + let endp_id = create_test_endpoint(&client, &app_id, &receiver.endpoint) + .await + .unwrap() + .id; + + let before_msg_1 = Utc::now(); + + let msg_1 = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"})) + .await + .unwrap(); + + wait_for_msg_retries(&cfg.retry_schedule).await; + + let before_msg_2 = Utc::now(); + + let msg_2 = create_test_message(&client, &app_id, serde_json::json!({"test": "data2"})) + .await + .unwrap(); + + wait_for_msg_retries(&cfg.retry_schedule).await; + + let after_msg_2 = Utc::now(); + + receiver.jh.abort(); + + ( + client, + app_id, + endp_id, + msg_1, + msg_2, + cfg.retry_schedule.len() + 1, + [before_msg_1, before_msg_2, after_msg_2], + ) +} + +#[tokio::test] +async fn test_recovery_expected_retry_counts() { + sleep(Duration::from_millis(50)).await; + + for (i, (msg_1_retry_cnt, msg2_retry_cnt)) in [ + // expected number of additional retry attempts for (msg1, msg2) if recover `since` is before msg 1: + (1, 1), + // expected values if recover `since` is before msg 2: + (0, 1), + // expected values if recover `since` is after msg 2: + (0, 0), + ] + .iter() + .enumerate() + { + let (client, app_id, endp_id, msg_1, msg_2, base_attempt_cnt, times) = + create_failed_message_attempts().await; + + recover_webhooks( + &client, + times[i], + &format!("api/v1/app/{}/endpoint/{}/recover/", app_id, endp_id), + ) + .await; + + sleep(Duration::from_millis(50)).await; + + get_msg_attempt_list_and_assert_count( + &client, + &app_id, + &msg_1.id, + base_attempt_cnt + msg_1_retry_cnt, + ) + .await + .unwrap(); + get_msg_attempt_list_and_assert_count( + &client, + &app_id, + &msg_2.id, + base_attempt_cnt + msg2_retry_cnt, + ) + .await + .unwrap(); + } +} diff --git a/server/svix-server/tests/utils/common_calls.rs b/server/svix-server/tests/utils/common_calls.rs index 3ec059939..152ab0f84 100644 --- a/server/svix-server/tests/utils/common_calls.rs +++ b/server/svix-server/tests/utils/common_calls.rs @@ -4,21 +4,24 @@ use std::time::Duration; use anyhow::Result; +use chrono::{DateTime, Utc}; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; use svix_server::{ - core::types::{ApplicationId, EventTypeName}, + core::types::{ApplicationId, EventTypeName, MessageId}, v1::{ endpoints::{ application::{ApplicationIn, ApplicationOut}, - endpoint::{EndpointIn, EndpointOut}, + attempt::MessageAttemptOut, + endpoint::{EndpointIn, EndpointOut, RecoverIn}, event_type::EventTypeIn, message::{MessageIn, MessageOut}, }, utils::ListResponse, }, }; +use tokio::time::sleep; use super::{run_with_retries, IgnoredResponse, TestClient}; @@ -208,3 +211,44 @@ pub async fn common_test_list< Ok(()) } + +pub async fn wait_for_msg_retries(retry_schedule: &[Duration]) { + for i in retry_schedule { + sleep(*i).await; + } + // Give attempts buffer time to complete: + sleep(Duration::from_millis(50)).await; +} + +pub async fn recover_webhooks(client: &TestClient, since: DateTime, url: &str) { + let _: serde_json::Value = client + .post(url, RecoverIn { since }, StatusCode::ACCEPTED) + .await + .unwrap(); +} + +pub async fn get_msg_attempt_list_and_assert_count( + client: &TestClient, + app_id: &ApplicationId, + msg_id: &MessageId, + expected_count: usize, +) -> Result> { + run_with_retries(|| async { + let list: ListResponse = client + .get( + &format!("api/v1/app/{}/attempt/msg/{}", app_id, msg_id), + StatusCode::OK, + ) + .await?; + + if list.data.len() != expected_count { + anyhow::bail!( + "Attempt count {} does not match expected length {}", + list.data.len(), + expected_count + ); + } + Ok(list) + }) + .await +}