Skip to content

Commit

Permalink
Add endpoints to erase payload and response
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-andor committed Jan 27, 2023
1 parent e36c383 commit 9eedb41
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 15 deletions.
43 changes: 41 additions & 2 deletions server/svix-server/src/v1/endpoints/attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
AppState,
};
use aide::axum::{
routing::{get, post},
routing::{delete, get, post},
ApiRouter,
};
use axum::{
Expand All @@ -37,7 +37,10 @@ use chrono::{DateTime, Utc};

use hyper::StatusCode;
use schemars::JsonSchema;
use sea_orm::{entity::prelude::*, sea_query::Expr, DatabaseConnection, QueryOrder, QuerySelect};
use sea_orm::{
entity::prelude::*, sea_query::Expr, DatabaseConnection, IntoActiveModel, QueryOrder,
QuerySelect,
};
use serde::{Deserialize, Serialize};

use svix_server_derive::ModelOut;
Expand Down Expand Up @@ -707,6 +710,37 @@ async fn resend_webhook(
Ok((StatusCode::ACCEPTED, Json(EmptyResponse {})))
}

async fn expunge_attempt_content(
State(AppState { ref db, .. }): State<AppState>,
Path((_app_id, msg_id, attempt_id)): Path<(
ApplicationIdOrUid,
MessageIdOrUid,
MessageAttemptId,
)>,
permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication,
) -> Result<StatusCode> {
let msg = ctx!(
message::Entity::secure_find_by_id_or_uid(app.id, msg_id)
.one(db)
.await
)?
.ok_or_else(|| HttpError::not_found(None, Some("Message not found".to_string())))?;

let mut attempt = ctx!(
messageattempt::Entity::secure_find_by_msg(msg.id)
.filter(messageattempt::Column::Id.eq(attempt_id))
.one(db)
.await
)?
.ok_or_else(|| HttpError::not_found(None, Some("Message attempt not found".to_string())))?
.into_active_model();

attempt.response = sea_orm::Set("EXPUNGED".to_string());
ctx!(attempt.update(db).await)?;

Ok(StatusCode::NO_CONTENT)
}

pub fn router() -> ApiRouter<AppState> {
ApiRouter::new()
// NOTE: [`list_messageattempts`] is deprecated
Expand All @@ -720,6 +754,11 @@ pub fn router() -> ApiRouter<AppState> {
get(get_messageattempt),
openapi_tag("Message Attempt"),
)
.api_route_with(
"/app/:app_id/msg/:msg_id/attempt/:attempt_id/content/",
delete(expunge_attempt_content),
openapi_tag("Message Attempt"),
)
.api_route_with(
"/app/:app_id/msg/:msg_id/endpoint/",
get(list_attempted_destinations),
Expand Down
28 changes: 26 additions & 2 deletions server/svix-server/src/v1/endpoints/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
AppState,
};
use aide::axum::{
routing::{get, post},
routing::{delete, get, post},
ApiRouter,
};
use axum::{
Expand All @@ -31,8 +31,8 @@ use axum::{
use chrono::{DateTime, Duration, Utc};
use hyper::StatusCode;
use schemars::JsonSchema;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveModelTrait;
use sea_orm::{entity::prelude::*, IntoActiveModel};
use sea_orm::{sea_query::Expr, ActiveValue::Set};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -305,6 +305,25 @@ async fn get_message(
Ok(Json(msg_out))
}

async fn expunge_message_content(
State(AppState { ref db, .. }): State<AppState>,
Path((_app_id, msg_id)): Path<(ApplicationIdOrUid, MessageIdOrUid)>,
permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication,
) -> Result<StatusCode> {
let mut msg = ctx!(
message::Entity::secure_find_by_id_or_uid(app.id, msg_id)
.one(db)
.await
)?
.ok_or_else(|| HttpError::not_found(None, None))?
.into_active_model();

msg.payload = Set(None);
ctx!(msg.update(db).await)?;

Ok(StatusCode::NO_CONTENT)
}

pub fn router() -> ApiRouter<AppState> {
ApiRouter::new()
.api_route_with(
Expand All @@ -317,6 +336,11 @@ pub fn router() -> ApiRouter<AppState> {
get(get_message),
openapi_tag("Message"),
)
.api_route_with(
"/app/:app_id/msg/:msg_id/content/",
delete(expunge_message_content),
openapi_tag("Message"),
)
}

#[cfg(test)]
Expand Down
63 changes: 62 additions & 1 deletion server/svix-server/tests/e2e_attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,72 @@ use utils::{
get_msg_attempt_list_and_assert_count,
},
get_default_test_config, run_with_retries, start_svix_server, start_svix_server_with_cfg,
TestReceiver,
IgnoredResponse, TestReceiver,
};

use std::time::Duration;

#[tokio::test]
async fn test_expunge_attempt_response_body() {
let (client, _jh) = start_svix_server().await;

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let sensitive_response_json = serde_json::json!({"sensitive":"data"});
let mut receiver = TestReceiver::start_with_body(
axum::http::StatusCode::OK,
axum::Json(sensitive_response_json.clone()),
);

let endpoint_id = create_test_endpoint(&client, &app_id, &receiver.endpoint)
.await
.unwrap()
.id;

let msg_id = create_test_message(&client, &app_id, serde_json::json!({"test": "data1"}))
.await
.unwrap()
.id;

receiver.data_recv.recv().await;

let attempt = run_with_retries(|| async {
let attempts: ListResponse<MessageAttemptOut> = client
.get(
&format!("api/v1/app/{app_id}/attempt/endpoint/{endpoint_id}/"),
StatusCode::OK,
)
.await
.unwrap();
assert_eq!(1, attempts.data.len());
Ok(attempts.data[0].clone())
})
.await
.unwrap();

let attempt_response: serde_json::Value = serde_json::from_str(&attempt.response).unwrap();
assert_eq!(sensitive_response_json, attempt_response);

let attempt_id = &attempt.id;
let _: IgnoredResponse = client
.delete(
&format!("api/v1/app/{app_id}/msg/{msg_id}/attempt/{attempt_id}/content/"),
StatusCode::NO_CONTENT,
)
.await
.unwrap();

let attempt: MessageAttemptOut = client
.get(
&format!("api/v1/app/{app_id}/msg/{msg_id}/attempt/{attempt_id}/"),
StatusCode::OK,
)
.await
.unwrap();

assert_eq!("EXPUNGED", &attempt.response);
}

#[tokio::test]
async fn test_list_attempted_messages() {
let (client, _jh) = start_svix_server().await;
Expand Down
5 changes: 2 additions & 3 deletions server/svix-server/tests/e2e_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ async fn test_msg_channels_filter() {

let app_id = create_test_app(&client, "app1").await.unwrap().id;

let _receiver = TestReceiver::start(StatusCode::OK);
let receiver = TestReceiver::start(StatusCode::OK);

let ec = EventChannelSet(HashSet::from([EventChannel("tag1".to_owned())]));

Expand All @@ -1897,6 +1897,7 @@ async fn test_msg_channels_filter() {
&app_id,
EndpointIn {
channels,
url: Url::parse(&receiver.endpoint).unwrap(),
..default_test_endpoint()
},
)
Expand All @@ -1920,8 +1921,6 @@ async fn test_msg_channels_filter() {
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

let _list =
get_msg_attempt_list_and_assert_count(&client, &app_id, &msg.id, expected_count)
.await
Expand Down
48 changes: 47 additions & 1 deletion server/svix-server/tests/e2e_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod utils;

use utils::{
common_calls::{create_test_app, create_test_endpoint, message_in},
run_with_retries, start_svix_server, TestReceiver,
run_with_retries, start_svix_server, IgnoredResponse, TestReceiver,
};

#[tokio::test]
Expand Down Expand Up @@ -391,3 +391,49 @@ async fn test_payload_retention_period() {

assert_eq!(message.unwrap().payload, None);
}

#[tokio::test]
async fn test_expunge_message_payload() {
let (client, _jh) = start_svix_server().await;

let app_id = create_test_app(&client, "testApp").await.unwrap().id;

let payload = serde_json::json!({"sensitive": "data"});
let msg: MessageOut = client
.post(
&format!("api/v1/app/{}/msg/", &app_id),
message_in(&app_id, payload.clone()).unwrap(),
StatusCode::ACCEPTED,
)
.await
.unwrap();

assert_eq!(msg.payload, payload);

let msg = client
.get::<MessageOut>(
&format!("api/v1/app/{}/msg/{}/", &app_id, &msg.id),
StatusCode::OK,
)
.await
.unwrap();
assert_eq!(msg.payload, payload);

let _: IgnoredResponse = client
.delete(
&format!("api/v1/app/{}/msg/{}/content/", &app_id, &msg.id),
StatusCode::NO_CONTENT,
)
.await
.unwrap();

let msg = client
.get::<MessageOut>(
&format!("api/v1/app/{}/msg/{}/", &app_id, &msg.id),
StatusCode::OK,
)
.await
.unwrap();

assert_eq!(msg.payload, serde_json::json!({"expired": true}));
}
27 changes: 21 additions & 6 deletions server/svix-server/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use anyhow::{Context, Result};
use axum::response::IntoResponse;
use reqwest::{Client, RequestBuilder, StatusCode};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -258,6 +259,7 @@ pub async fn start_svix_server_with_cfg_and_org_id(
(TestClient::new(base_uri, &token), jh)
}

#[derive(Debug)]
pub struct TestReceiver {
pub endpoint: String,
pub jh: tokio::task::JoinHandle<()>,
Expand All @@ -267,19 +269,27 @@ pub struct TestReceiver {
}

#[derive(Clone)]
pub struct TestAppState {
pub struct TestAppState<T: IntoResponse + Clone> {
tx: mpsc::Sender<serde_json::Value>,
header_tx: mpsc::Sender<HeaderMap>,
response_status_code: Arc<Mutex<ResponseStatusCode>>,
response_body: T,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct ResponseStatusCode {
pub status_code: axum::http::StatusCode,
}

impl TestReceiver {
pub fn start(resp_with: axum::http::StatusCode) -> Self {
Self::start_with_body(resp_with, ())
}

pub fn start_with_body<T>(resp_with: axum::http::StatusCode, body: T) -> Self
where
T: IntoResponse + Clone + Send + Sync + 'static,
{
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let endpoint = format!("http://{}/", listener.local_addr().unwrap());

Expand All @@ -299,6 +309,7 @@ impl TestReceiver {
tx,
header_tx,
response_status_code: response_status_code.clone(),
response_body: body,
})
.into_make_service();

Expand All @@ -324,18 +335,22 @@ impl TestReceiver {
}
}

async fn test_receiver_route(
async fn test_receiver_route<T: IntoResponse + Clone>(
axum::extract::State(TestAppState {
tx,
header_tx,
response_status_code,
}): axum::extract::State<TestAppState>,
response_body,
}): axum::extract::State<TestAppState<T>>,
headers: HeaderMap,
axum::Json(json): axum::Json<serde_json::Value>,
) -> axum::http::StatusCode {
) -> (axum::http::StatusCode, impl IntoResponse) {
tx.send(json).await.unwrap();
header_tx.send(headers).await.unwrap();
response_status_code.lock().unwrap().status_code
(
response_status_code.lock().unwrap().status_code,
response_body,
)
}

pub async fn run_with_retries<O, F, C>(f: C) -> Result<O>
Expand Down

0 comments on commit 9eedb41

Please sign in to comment.