Skip to content

Commit

Permalink
Recover Failed Webhooks use configurable until
Browse files Browse the repository at this point in the history
Our API docs support an optional `until` parameter. This adds
support for it in the backend.
  • Loading branch information
jaymell authored and svix-james committed Jun 26, 2024
1 parent 4d5ec73 commit 464abe1
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 10 deletions.
5 changes: 5 additions & 0 deletions server/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,11 @@
"since": {
"format": "date-time",
"type": "string"
},
"until": {
"format": "date-time",
"nullable": true,
"type": "string"
}
},
"required": [
Expand Down
1 change: 1 addition & 0 deletions server/svix-server/src/v1/endpoints/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ pub struct EndpointSecretOut {
#[serde(rename_all = "camelCase")]
pub struct RecoverIn {
pub since: DateTime<Utc>,
pub until: Option<DateTime<Utc>>,
}

fn endpoint_headers_example() -> HashMap<&'static str, &'static str> {
Expand Down
24 changes: 15 additions & 9 deletions server/svix-server/src/v1/endpoints/endpoint/recovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use axum::extract::{Path, State};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use schemars::JsonSchema;
use sea_orm::{entity::prelude::*, QueryOrder, QuerySelect};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -27,6 +27,7 @@ async fn bulk_recover_failed_messages(
app: application::Model,
endp: endpoint::Model,
since: DateTime<Utc>,
until: DateTime<Utc>,
) -> Result<()> {
const RECOVER_LIMIT: u64 = 10_000;
const BATCH_SIZE: u64 = 100;
Expand All @@ -36,6 +37,7 @@ async fn bulk_recover_failed_messages(
loop {
let mut query = messagedestination::Entity::secure_find_by_endpoint(endp.id.clone())
.filter(messagedestination::Column::Id.gte(MessageEndpointId::start_id(since)))
.filter(messagedestination::Column::Id.lt(MessageEndpointId::start_id(until)))
.filter(messagedestination::Column::Status.eq(MessageStatus::Fail))
.order_by_asc(messagedestination::Column::Id)
.limit(RECOVER_LIMIT);
Expand Down Expand Up @@ -99,16 +101,20 @@ pub(super) async fn recover_failed_webhooks(
}): State<AppState>,
Path(ApplicationEndpointPath { endpoint_id, .. }): Path<ApplicationEndpointPath>,
permissions::Application { app }: permissions::Application,
ValidatedJson(data): ValidatedJson<RecoverIn>,
ValidatedJson(RecoverIn { since, until }): ValidatedJson<RecoverIn>,
) -> Result<JsonStatus<202, RecoverOut>> {
let until = until.unwrap_or_else(Utc::now);

// Add five minutes so that people can easily just do `now() - two_weeks` without having to worry about clock sync
let timeframe = chrono::Duration::days(14);
let timeframe = timeframe + chrono::Duration::minutes(5);
let max_timeframe = Duration::days(14) + Duration::minutes(5);

if data.since < Utc::now() - timeframe {
if since < until - max_timeframe {
return Err(HttpError::unprocessable_entity(vec![ValidationErrorItem {
loc: vec!["body".to_owned(), "since".to_owned()],
msg: "Cannot recover messages more than 14 days old.".to_owned(),
msg: format!(
"Cannot recover more than {} days of messages",
max_timeframe.num_days()
),
ty: "value_error".to_owned(),
}])
.into());
Expand All @@ -121,9 +127,9 @@ pub(super) async fn recover_failed_webhooks(

let db = db.clone();
let queue_tx = queue_tx.clone();
tokio::spawn(
async move { bulk_recover_failed_messages(db, queue_tx, app, endp, data.since).await },
);
tokio::spawn(async move {
bulk_recover_failed_messages(db, queue_tx, app, endp, since, until).await
});

Ok(JsonStatus(RecoverOut {
id: QueueBackgroundTaskId::new(None, None),
Expand Down
1 change: 1 addition & 0 deletions server/svix-server/tests/it/e2e_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ async fn test_recovery_should_fail_if_start_time_too_old() {
&format!("api/v1/app/{app_id}/endpoint/{endp_id}/recover/"),
RecoverIn {
since: Utc::now() - chrono::Duration::weeks(3),
until: None,
},
StatusCode::UNPROCESSABLE_ENTITY,
)
Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/tests/it/utils/common_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ pub async fn common_test_list<

pub async fn recover_webhooks(client: &TestClient, since: DateTime<Utc>, url: &str) {
client
.post_without_response(url, RecoverIn { since }, StatusCode::ACCEPTED)
.post_without_response(url, RecoverIn { since, until: None }, StatusCode::ACCEPTED)
.await
.unwrap();
}
Expand Down

0 comments on commit 464abe1

Please sign in to comment.