diff --git a/server/svix-server/src/v1/endpoints/attempt.rs b/server/svix-server/src/v1/endpoints/attempt.rs index f8fe84680..1009d510b 100644 --- a/server/svix-server/src/v1/endpoints/attempt.rs +++ b/server/svix-server/src/v1/endpoints/attempt.rs @@ -26,7 +26,7 @@ use crate::{ AppState, }; use aide::axum::{ - routing::{get, post}, + routing::{delete, get, post}, ApiRouter, }; use axum::{ @@ -37,7 +37,10 @@ use chrono::{DateTime, Utc}; use hyper::StatusCode; use schemars::JsonSchema; -use sea_orm::{entity::prelude::*, sea_query::Expr, DatabaseConnection, QueryOrder, QuerySelect}; +use sea_orm::{ + entity::prelude::*, sea_query::Expr, DatabaseConnection, IntoActiveModel, QueryOrder, + QuerySelect, +}; use serde::{Deserialize, Serialize}; use svix_server_derive::ModelOut; @@ -707,6 +710,37 @@ async fn resend_webhook( Ok((StatusCode::ACCEPTED, Json(EmptyResponse {}))) } +async fn expunge_attempt_content( + State(AppState { ref db, .. }): State, + Path((_app_id, msg_id, attempt_id)): Path<( + ApplicationIdOrUid, + MessageIdOrUid, + MessageAttemptId, + )>, + permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication, +) -> Result { + let msg = ctx!( + message::Entity::secure_find_by_id_or_uid(app.id, msg_id) + .one(db) + .await + )? + .ok_or_else(|| HttpError::not_found(None, Some("Message not found".to_string())))?; + + let mut attempt = ctx!( + messageattempt::Entity::secure_find_by_msg(msg.id) + .filter(messageattempt::Column::Id.eq(attempt_id)) + .one(db) + .await + )? + .ok_or_else(|| HttpError::not_found(None, Some("Message attempt not found".to_string())))? + .into_active_model(); + + attempt.response = sea_orm::Set("EXPUNGED".to_string()); + ctx!(attempt.update(db).await)?; + + Ok(StatusCode::NO_CONTENT) +} + pub fn router() -> ApiRouter { ApiRouter::new() // NOTE: [`list_messageattempts`] is deprecated @@ -720,6 +754,11 @@ pub fn router() -> ApiRouter { get(get_messageattempt), openapi_tag("Message Attempt"), ) + .api_route_with( + "/app/:app_id/msg/:msg_id/attempt/:attempt_id/content/", + delete(expunge_attempt_content), + openapi_tag("Message Attempt"), + ) .api_route_with( "/app/:app_id/msg/:msg_id/endpoint/", get(list_attempted_destinations), diff --git a/server/svix-server/src/v1/endpoints/message.rs b/server/svix-server/src/v1/endpoints/message.rs index 519ddb6c7..0efb3cd30 100644 --- a/server/svix-server/src/v1/endpoints/message.rs +++ b/server/svix-server/src/v1/endpoints/message.rs @@ -21,7 +21,7 @@ use crate::{ AppState, }; use aide::axum::{ - routing::{get, post}, + routing::{delete, get, post}, ApiRouter, }; use axum::{ @@ -31,8 +31,8 @@ use axum::{ use chrono::{DateTime, Duration, Utc}; use hyper::StatusCode; use schemars::JsonSchema; -use sea_orm::entity::prelude::*; use sea_orm::ActiveModelTrait; +use sea_orm::{entity::prelude::*, IntoActiveModel}; use sea_orm::{sea_query::Expr, ActiveValue::Set}; use serde::{Deserialize, Serialize}; @@ -305,6 +305,25 @@ async fn get_message( Ok(Json(msg_out)) } +async fn expunge_message_content( + State(AppState { ref db, .. }): State, + Path((_app_id, msg_id)): Path<(ApplicationIdOrUid, MessageIdOrUid)>, + permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication, +) -> Result { + let mut msg = ctx!( + message::Entity::secure_find_by_id_or_uid(app.id, msg_id) + .one(db) + .await + )? + .ok_or_else(|| HttpError::not_found(None, None))? + .into_active_model(); + + msg.payload = Set(None); + ctx!(msg.update(db).await)?; + + Ok(StatusCode::NO_CONTENT) +} + pub fn router() -> ApiRouter { ApiRouter::new() .api_route_with( @@ -317,6 +336,11 @@ pub fn router() -> ApiRouter { get(get_message), openapi_tag("Message"), ) + .api_route_with( + "/app/:app_id/msg/:msg_id/content/", + delete(expunge_message_content), + openapi_tag("Message"), + ) } #[cfg(test)] diff --git a/server/svix-server/tests/e2e_attempt.rs b/server/svix-server/tests/e2e_attempt.rs index 2dfe258fd..16d0e79a6 100644 --- a/server/svix-server/tests/e2e_attempt.rs +++ b/server/svix-server/tests/e2e_attempt.rs @@ -22,11 +22,72 @@ use utils::{ get_msg_attempt_list_and_assert_count, }, get_default_test_config, run_with_retries, start_svix_server, start_svix_server_with_cfg, - TestReceiver, + IgnoredResponse, TestReceiver, }; use std::time::Duration; +#[tokio::test] +async fn test_expunge_attempt_response_body() { + let (client, _jh) = start_svix_server().await; + + let app_id = create_test_app(&client, "app1").await.unwrap().id; + + let sensitive_response_json = serde_json::json!({"sensitive":"data"}); + let mut receiver = TestReceiver::start_with_body( + axum::http::StatusCode::OK, + axum::Json(sensitive_response_json.clone()), + ); + + let endpoint_id = create_test_endpoint(&client, &app_id, &receiver.endpoint) + .await + .unwrap() + .id; + + let msg_id = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"})) + .await + .unwrap() + .id; + + receiver.data_recv.recv().await; + + let attempt = run_with_retries(|| async { + let attempts: ListResponse = client + .get( + &format!("api/v1/app/{app_id}/attempt/endpoint/{endpoint_id}/"), + StatusCode::OK, + ) + .await + .unwrap(); + assert_eq!(1, attempts.data.len()); + Ok(attempts.data[0].clone()) + }) + .await + .unwrap(); + + let attempt_response: serde_json::Value = serde_json::from_str(&attempt.response).unwrap(); + assert_eq!(sensitive_response_json, attempt_response); + + let attempt_id = &attempt.id; + let _: IgnoredResponse = client + .delete( + &format!("api/v1/app/{app_id}/msg/{msg_id}/attempt/{attempt_id}/content/"), + StatusCode::NO_CONTENT, + ) + .await + .unwrap(); + + let attempt: MessageAttemptOut = client + .get( + &format!("api/v1/app/{app_id}/msg/{msg_id}/attempt/{attempt_id}/"), + StatusCode::OK, + ) + .await + .unwrap(); + + assert_eq!("EXPUNGED", &attempt.response); +} + #[tokio::test] async fn test_list_attempted_messages() { let (client, _jh) = start_svix_server().await; diff --git a/server/svix-server/tests/e2e_endpoint.rs b/server/svix-server/tests/e2e_endpoint.rs index 19b0f2180..131f210ac 100644 --- a/server/svix-server/tests/e2e_endpoint.rs +++ b/server/svix-server/tests/e2e_endpoint.rs @@ -1887,7 +1887,7 @@ async fn test_msg_channels_filter() { let app_id = create_test_app(&client, "app1").await.unwrap().id; - let _receiver = TestReceiver::start(StatusCode::OK); + let receiver = TestReceiver::start(StatusCode::OK); let ec = EventChannelSet(HashSet::from([EventChannel("tag1".to_owned())])); @@ -1897,6 +1897,7 @@ async fn test_msg_channels_filter() { &app_id, EndpointIn { channels, + url: Url::parse(&receiver.endpoint).unwrap(), ..default_test_endpoint() }, ) @@ -1920,8 +1921,6 @@ async fn test_msg_channels_filter() { .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - let _list = get_msg_attempt_list_and_assert_count(&client, &app_id, &msg.id, expected_count) .await diff --git a/server/svix-server/tests/e2e_message.rs b/server/svix-server/tests/e2e_message.rs index 56169b80e..88dca48cf 100644 --- a/server/svix-server/tests/e2e_message.rs +++ b/server/svix-server/tests/e2e_message.rs @@ -17,7 +17,7 @@ mod utils; use utils::{ common_calls::{create_test_app, create_test_endpoint, message_in}, - run_with_retries, start_svix_server, TestReceiver, + run_with_retries, start_svix_server, IgnoredResponse, TestReceiver, }; #[tokio::test] @@ -391,3 +391,49 @@ async fn test_payload_retention_period() { assert_eq!(message.unwrap().payload, None); } + +#[tokio::test] +async fn test_expunge_message_payload() { + let (client, _jh) = start_svix_server().await; + + let app_id = create_test_app(&client, "testApp").await.unwrap().id; + + let payload = serde_json::json!({"sensitive": "data"}); + let msg: MessageOut = client + .post( + &format!("api/v1/app/{}/msg/", &app_id), + message_in(&app_id, payload.clone()).unwrap(), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + + assert_eq!(msg.payload, payload); + + let msg = client + .get::( + &format!("api/v1/app/{}/msg/{}/", &app_id, &msg.id), + StatusCode::OK, + ) + .await + .unwrap(); + assert_eq!(msg.payload, payload); + + let _: IgnoredResponse = client + .delete( + &format!("api/v1/app/{}/msg/{}/content/", &app_id, &msg.id), + StatusCode::NO_CONTENT, + ) + .await + .unwrap(); + + let msg = client + .get::( + &format!("api/v1/app/{}/msg/{}/", &app_id, &msg.id), + StatusCode::OK, + ) + .await + .unwrap(); + + assert_eq!(msg.payload, serde_json::json!({"expired": true})); +} diff --git a/server/svix-server/tests/utils/mod.rs b/server/svix-server/tests/utils/mod.rs index b91fe0c44..fe722641f 100644 --- a/server/svix-server/tests/utils/mod.rs +++ b/server/svix-server/tests/utils/mod.rs @@ -8,6 +8,7 @@ use std::{ }; use anyhow::{Context, Result}; +use axum::response::IntoResponse; use reqwest::{Client, RequestBuilder, StatusCode}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::sync::mpsc; @@ -258,6 +259,7 @@ pub async fn start_svix_server_with_cfg_and_org_id( (TestClient::new(base_uri, &token), jh) } +#[derive(Debug)] pub struct TestReceiver { pub endpoint: String, pub jh: tokio::task::JoinHandle<()>, @@ -267,19 +269,27 @@ pub struct TestReceiver { } #[derive(Clone)] -pub struct TestAppState { +pub struct TestAppState { tx: mpsc::Sender, header_tx: mpsc::Sender, response_status_code: Arc>, + response_body: T, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ResponseStatusCode { pub status_code: axum::http::StatusCode, } impl TestReceiver { pub fn start(resp_with: axum::http::StatusCode) -> Self { + Self::start_with_body(resp_with, ()) + } + + pub fn start_with_body(resp_with: axum::http::StatusCode, body: T) -> Self + where + T: IntoResponse + Clone + Send + Sync + 'static, + { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let endpoint = format!("http://{}/", listener.local_addr().unwrap()); @@ -299,6 +309,7 @@ impl TestReceiver { tx, header_tx, response_status_code: response_status_code.clone(), + response_body: body, }) .into_make_service(); @@ -324,18 +335,22 @@ impl TestReceiver { } } -async fn test_receiver_route( +async fn test_receiver_route( axum::extract::State(TestAppState { tx, header_tx, response_status_code, - }): axum::extract::State, + response_body, + }): axum::extract::State>, headers: HeaderMap, axum::Json(json): axum::Json, -) -> axum::http::StatusCode { +) -> (axum::http::StatusCode, impl IntoResponse) { tx.send(json).await.unwrap(); header_tx.send(headers).await.unwrap(); - response_status_code.lock().unwrap().status_code + ( + response_status_code.lock().unwrap().status_code, + response_body, + ) } pub async fn run_with_retries(f: C) -> Result