Skip to content

Commit

Permalink
Implement bidirectional pagination for endpoints (#812)
Browse files Browse the repository at this point in the history
With this change `Endpoint` also returns a `prev_iterator` in its list
response, and accepts such an iterator.
  • Loading branch information
svix-andor authored Feb 7, 2023
2 parents 0fbf40f + f5543f6 commit b1536f6
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 33 deletions.
24 changes: 12 additions & 12 deletions server/svix-server/src/v1/endpoints/attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use crate::{
v1::{
endpoints::message::MessageOut,
utils::{
apply_pagination, iterator_from_before_or_after, openapi_tag, ApplicationEndpointPath,
ApplicationMsgAttemptPath, ApplicationMsgEndpointPath, ApplicationMsgPath,
EmptyResponse, EventTypesQuery, ListResponse, ModelOut, PaginationLimit,
ReversibleIterator, ValidatedQuery,
apply_pagination_desc, iterator_from_before_or_after, openapi_tag,
ApplicationEndpointPath, ApplicationMsgAttemptPath, ApplicationMsgEndpointPath,
ApplicationMsgPath, EmptyResponse, EventTypesQuery, ListResponse, ModelOut,
PaginationLimit, ReversibleIterator, ValidatedQuery,
},
},
AppState,
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn list_attempted_messages(
let iterator = iterator_from_before_or_after(msg_dest_iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));

let dests_and_msgs = apply_pagination(
let dests_and_msgs = apply_pagination_desc(
dests_and_msgs,
messagedestination::Column::Id,
limit,
Expand All @@ -211,7 +211,7 @@ async fn list_attempted_messages(
.collect::<Result<_>>()?
};

Ok(Json(AttemptedMessageOut::list_response(
Ok(Json(AttemptedMessageOut::list_response_desc(
out,
limit as usize,
is_prev,
Expand Down Expand Up @@ -330,7 +330,7 @@ async fn list_attempts_by_endpoint(

let iterator = iterator_from_before_or_after(pagination.iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));
let query = apply_pagination(query, messageattempt::Column::Id, limit, iterator);
let query = apply_pagination_desc(query, messageattempt::Column::Id, limit, iterator);

let out = if is_prev {
ctx!(query.all(db).await)?
Expand All @@ -345,7 +345,7 @@ async fn list_attempts_by_endpoint(
.collect()
};

Ok(Json(MessageAttemptOut::list_response(
Ok(Json(MessageAttemptOut::list_response_desc(
out,
limit as usize,
is_prev,
Expand Down Expand Up @@ -414,7 +414,7 @@ async fn list_attempts_by_msg(

let iterator = iterator_from_before_or_after(pagination.iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));
let query = apply_pagination(query, messageattempt::Column::Id, limit, iterator);
let query = apply_pagination_desc(query, messageattempt::Column::Id, limit, iterator);
let out = if is_prev {
ctx!(query.all(db).await)?
.into_iter()
Expand All @@ -428,7 +428,7 @@ async fn list_attempts_by_msg(
.collect()
};

Ok(Json(MessageAttemptOut::list_response(
Ok(Json(MessageAttemptOut::list_response_desc(
out,
limit as usize,
is_prev,
Expand Down Expand Up @@ -613,7 +613,7 @@ async fn list_messageattempts(

let iterator = iterator_from_before_or_after(pagination.iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));
let query = apply_pagination(query, messageattempt::Column::Id, limit, iterator);
let query = apply_pagination_desc(query, messageattempt::Column::Id, limit, iterator);
let out = if is_prev {
ctx!(query.all(db).await)?
.into_iter()
Expand All @@ -627,7 +627,7 @@ async fn list_messageattempts(
.collect()
};

Ok(Json(MessageAttemptOut::list_response(
Ok(Json(MessageAttemptOut::list_response_desc(
out,
limit as usize,
false,
Expand Down
26 changes: 14 additions & 12 deletions server/svix-server/src/v1/endpoints/endpoint/crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::{
Json,
};
use hyper::StatusCode;
use sea_orm::{entity::prelude::*, ActiveValue::Set, QueryOrder, TransactionTrait};
use sea_orm::{entity::prelude::*, ActiveValue::Set, TransactionTrait};
use sea_orm::{ActiveModelTrait, DatabaseConnection, QuerySelect};
use url::Url;

Expand All @@ -21,29 +21,30 @@ use crate::{
db::models::{application, endpoint, endpointmetadata, eventtype},
error::{HttpError, Result, ValidationErrorItem},
v1::utils::{
apply_pagination_asc,
patch::{patch_field_non_nullable, UnrequiredField, UnrequiredNullableField},
ApplicationEndpointPath, EmptyResponse, ListResponse, ModelIn, ModelOut, Pagination,
PaginationLimit, ValidatedJson, ValidatedQuery,
PaginationLimit, ReversibleIterator, ValidatedJson, ValidatedQuery,
},
AppState,
};
use hack::EventTypeNameResult;

pub(super) async fn list_endpoints(
State(AppState { ref db, .. }): State<AppState>,
pagination: ValidatedQuery<Pagination<EndpointId>>,
ValidatedQuery(pagination): ValidatedQuery<Pagination<ReversibleIterator<EndpointId>>>,
permissions::Application { app }: permissions::Application,
) -> Result<Json<ListResponse<EndpointOut>>> {
let PaginationLimit(limit) = pagination.limit;
let iterator = pagination.iterator.clone();
let iterator = pagination.iterator;
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));

let mut query = endpoint::Entity::secure_find(app.id)
.order_by_asc(endpoint::Column::Id)
.limit(limit + 1);

if let Some(iterator) = iterator {
query = query.filter(endpoint::Column::Id.gt(iterator))
}
let query = apply_pagination_asc(
endpoint::Entity::secure_find(app.id),
endpoint::Column::Id,
limit,
iterator,
);

let results = ctx!(
query
Expand All @@ -58,9 +59,10 @@ pub(super) async fn list_endpoints(
})
.collect();

Ok(Json(EndpointOut::list_response_no_prev(
Ok(Json(EndpointOut::list_response_asc(
results,
limit as usize,
is_prev,
)))
}

Expand Down
10 changes: 7 additions & 3 deletions server/svix-server/src/v1/endpoints/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
error::{HttpError, Result},
queue::MessageTaskBatch,
v1::utils::{
apply_pagination, iterator_from_before_or_after, openapi_tag, validation_error,
apply_pagination_desc, iterator_from_before_or_after, openapi_tag, validation_error,
ApplicationMsgPath, EventTypesQuery, ListResponse, ModelIn, ModelOut, PaginationLimit,
ReversibleIterator, ValidatedJson, ValidatedQuery,
},
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn list_messages(
let iterator = iterator_from_before_or_after(pagination.iterator, before, after);
let is_prev = matches!(iterator, Some(ReversibleIterator::Prev(_)));

let query = apply_pagination(query, message::Column::Id, limit, iterator);
let query = apply_pagination_desc(query, message::Column::Id, limit, iterator);
let into = |x: message::Model| {
if with_content {
x.into()
Expand All @@ -215,7 +215,11 @@ async fn list_messages(
ctx!(query.all(db).await)?.into_iter().map(into).collect()
};

Ok(Json(MessageOut::list_response(out, limit as usize, false)))
Ok(Json(MessageOut::list_response_desc(
out,
limit as usize,
false,
)))
}

#[derive(Debug, Deserialize, Validate, JsonSchema)]
Expand Down
77 changes: 71 additions & 6 deletions server/svix-server/src/v1/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn iterator_from_before_or_after<I: BaseId<Output = I> + Validate>(
}

/// Applies sorting and filtration to a query from its iterator, sort column, and limit
pub fn apply_pagination<
pub fn apply_pagination_desc<
Q: QuerySelect + QueryOrder + QueryFilter,
C: ColumnTrait,
I: BaseId<Output = I> + Validate + Into<sea_orm::Value>,
Expand All @@ -175,6 +175,34 @@ pub fn apply_pagination<
}
}

/// Applies sorting and filtration to a query from its iterator, sort column, and limit,
/// in an ascending ordering, i.e. one where `Normal` corresponds to starting at the lowest
/// and counting up, and `Prev` corresponds to starting at the highest and counting down.
pub fn apply_pagination_asc<
Q: QuerySelect + QueryOrder + QueryFilter,
C: ColumnTrait,
I: BaseId<Output = I> + Validate + Into<sea_orm::Value>,
>(
query: Q,
sort_column: C,
limit: u64,
iterator: Option<ReversibleIterator<I>>,
) -> Q {
let query = query.limit(limit + 1);

match iterator {
Some(ReversibleIterator::Prev(id)) => {
query.order_by_desc(sort_column).filter(sort_column.lt(id))
}

Some(ReversibleIterator::Normal(id)) => {
query.order_by_asc(sort_column).filter(sort_column.gt(id))
}

None => query.order_by_asc(sort_column),
}
}

#[derive(Serialize)]
pub struct EmptyResponse {}

Expand All @@ -194,16 +222,29 @@ pub trait ModelIn {
fn update_model(self, model: &mut Self::ActiveModel);
}

#[derive(PartialEq, Eq)]
enum ListOrdering {
Ascending,
Descending,
}

#[derive(PartialEq, Eq)]
enum IteratorDirection {
Next,
Previous,
}

fn list_response_inner<T: ModelOut>(
mut data: Vec<T>,
limit: usize,
is_prev_iter: bool,
ordering: ListOrdering,
iter_direction: IteratorDirection,
supports_prev_iterator: bool,
) -> ListResponse<T> {
let done = data.len() <= limit;

if data.len() > limit {
if is_prev_iter {
if iter_direction == IteratorDirection::Previous {
data = data.drain(data.len() - limit..).collect();
} else {
data.truncate(limit);
Expand All @@ -217,6 +258,10 @@ fn list_response_inner<T: ModelOut>(
};
let iterator = data.last().map(|x| x.id_copy());

if ordering == ListOrdering::Ascending && iter_direction == IteratorDirection::Previous {
data.reverse();
}

ListResponse {
data,
iterator,
Expand All @@ -228,12 +273,32 @@ fn list_response_inner<T: ModelOut>(
pub trait ModelOut: Clone {
fn id_copy(&self) -> String;

fn list_response(data: Vec<Self>, limit: usize, is_prev_iter: bool) -> ListResponse<Self> {
list_response_inner(data, limit, is_prev_iter, true)
fn list_response_asc(data: Vec<Self>, limit: usize, is_prev_iter: bool) -> ListResponse<Self> {
let direction = if is_prev_iter {
IteratorDirection::Previous
} else {
IteratorDirection::Next
};
list_response_inner(data, limit, ListOrdering::Ascending, direction, true)
}

fn list_response_desc(data: Vec<Self>, limit: usize, is_prev_iter: bool) -> ListResponse<Self> {
let direction = if is_prev_iter {
IteratorDirection::Previous
} else {
IteratorDirection::Next
};
list_response_inner(data, limit, ListOrdering::Descending, direction, true)
}

fn list_response_no_prev(data: Vec<Self>, limit: usize) -> ListResponse<Self> {
list_response_inner(data, limit, false, false)
list_response_inner(
data,
limit,
ListOrdering::Ascending,
IteratorDirection::Next,
false,
)
}
}

Expand Down
62 changes: 62 additions & 0 deletions server/svix-server/tests/e2e_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,68 @@ async fn test_list() {
.unwrap();
}

#[tokio::test]
async fn test_endpoint_list_ordering() {
let (client, _jh) = start_svix_server().await;

let app_id = create_test_app(&client, "App1").await.unwrap().id;

for i in 0..5 {
create_test_endpoint(&client, &app_id, &format!("https://test.url/{i}"))
.await
.unwrap();
// Sleep to account for ksuid 4ms resolution
tokio::time::sleep(Duration::from_millis(5)).await;
}

let first_list: ListResponse<EndpointOut> = client
.get(
&format!("api/v1/app/{}/endpoint/?limit=2", &app_id),
StatusCode::OK,
)
.await
.unwrap();

assert_eq!(
first_list.data.first().unwrap().ep.url,
"https://test.url/0"
);
assert_eq!(first_list.data.last().unwrap().ep.url, "https://test.url/1");
assert!(!first_list.done);

let list: ListResponse<EndpointOut> = client
.get(
&format!(
"api/v1/app/{}/endpoint/?limit=2&iterator={}",
&app_id,
first_list.iterator.unwrap()
),
StatusCode::OK,
)
.await
.unwrap();

assert_eq!(list.data.first().unwrap().ep.url, "https://test.url/2");
assert_eq!(list.data.last().unwrap().ep.url, "https://test.url/3");
assert!(!list.done);

let list: ListResponse<EndpointOut> = client
.get(
&format!(
"api/v1/app/{}/endpoint/?iterator={}",
&app_id,
list.prev_iterator.unwrap()
),
StatusCode::OK,
)
.await
.unwrap();

assert_eq!(list.data.first().unwrap().ep.url, "https://test.url/0");
assert_eq!(list.data.last().unwrap().ep.url, "https://test.url/1");
assert!(list.done);
}

/// Tests that there is at most one endpoint with a single UID for all endpoints associated with
/// any application
#[tokio::test]
Expand Down

0 comments on commit b1536f6

Please sign in to comment.