Skip to content

Commit

Permalink
Port TS test to Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Oct 17, 2024
1 parent 8a24494 commit 66947c4
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod max_integer;
mod prisma_10098;
mod prisma_10935;
mod prisma_11750;
mod prisma_11789;
mod prisma_12572;
mod prisma_12929;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use query_engine_tests::*;

/// Regression test for <https://github.com/prisma/prisma/issues/11750>.
///
/// See also <https://github.com/prisma/prisma/pull/12563> and
/// <https://github.com/prisma/prisma-engines/pull/2811>.
///
/// This is a port of the TypeScript test from the client test suite.
///
/// The test creates a user and then tries to update the same row in multiple concurrent
/// transactions. We don't assert that most operations succeed and merely log the errors happening
/// during update or commit, as those are expected to happen. We do fail the test if creating the
/// user fails, or if we fail to start a transaction, as those operations are expected to succeed.
///
/// What we really test here, though, is that:
///
/// 1. Query engine must not deadlock (leading to the test never finishing).
///
/// 2. We don't exhaust all available database connections leaving them locked on the database side
/// leading for all subsequent tests to fail.
///
#[test_suite(schema(user), exclude(Sqlite))]
mod prisma_11750 {
use std::sync::Arc;
use tokio::task::JoinSet;

#[connector_test]
async fn test_itx_concurrent_updates_single_thread(runner: Runner) -> TestResult<()> {
let runner = Arc::new(runner);

create_user(&runner, 1, "x").await?;

for _ in 0..10 {
tokio::try_join!(
update_user(Arc::clone(&runner), "a"),
update_user(Arc::clone(&runner), "b"),
update_user(Arc::clone(&runner), "c"),
update_user(Arc::clone(&runner), "d"),
update_user(Arc::clone(&runner), "e"),
update_user(Arc::clone(&runner), "f"),
update_user(Arc::clone(&runner), "g"),
update_user(Arc::clone(&runner), "h"),
update_user(Arc::clone(&runner), "i"),
update_user(Arc::clone(&runner), "j"),
)?;
}

create_user(&runner, 2, "y").await?;

Ok(())
}

#[connector_test]
async fn test_itx_concurrent_updates_multi_thread(runner: Runner) -> TestResult<()> {
let runner = Arc::new(runner);

create_user(&runner, 1, "x").await?;

for _ in 0..10 {
let mut set = JoinSet::new();

for email in ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"] {
set.spawn(update_user(Arc::clone(&runner), email));
}

while let Some(handle) = set.join_next().await {
handle.expect("task panicked or canceled")?;
}
}

create_user(&runner, 2, "y").await?;

Ok(())
}

async fn create_user(runner: &Runner, id: u32, email: &str) -> TestResult<()> {
run_query!(
&runner,
format!(
r#"mutation {{
createOneUser(
data: {{
id: {id},
first_name: "Alice",
last_name: "Margatroid",
email: "{email}"
}}
) {{ id }}
}}"#
)
);

Ok(())
}

async fn update_user(runner: Arc<Runner>, new_email: &str) -> TestResult<()> {
let tx_id = runner.start_tx(2000, 25, None).await?;

let result = runner
.query_in_tx(
&tx_id,
format!(
r#"mutation {{
updateOneUser(
where: {{ id: 1 }},
data: {{ email: "{new_email}" }}
) {{ id }}
}}"#
),
)
.await;

if let Err(err) = result {
tracing::error!(%err, "query error");
}

let result = runner.commit_tx(tx_id).await?;

if let Err(err) = result {
tracing::error!(?err, "commit error");
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,15 @@ impl Runner {
})
}

pub async fn query<T>(&self, query: T) -> TestResult<QueryResult>
pub async fn query(&self, query: impl Into<String>) -> TestResult<QueryResult> {
self.query_with_maybe_tx_id(self.current_tx_id.as_ref(), query).await
}

pub async fn query_in_tx(&self, tx_id: &TxId, query: impl Into<String>) -> TestResult<QueryResult> {
self.query_with_maybe_tx_id(Some(tx_id), query).await
}

async fn query_with_maybe_tx_id<T>(&self, tx_id: Option<&TxId>, query: T) -> TestResult<QueryResult>
where
T: Into<String>,
{
Expand All @@ -316,7 +324,7 @@ impl Runner {
RunnerExecutor::Builtin(e) => e,
RunnerExecutor::External(external) => match JsonRequest::from_graphql(&query, self.query_schema()) {
Ok(json_query) => {
let mut response = external.query(json_query, self.current_tx_id.as_ref()).await?;
let mut response = external.query(json_query, tx_id).await?;
response.detag();
return Ok(response);
}
Expand Down Expand Up @@ -353,7 +361,7 @@ impl Runner {
}
};

let response = handler.handle(request_body, self.current_tx_id.clone(), None).await;
let response = handler.handle(request_body, tx_id.cloned(), None).await;

let result: QueryResult = match self.protocol {
EngineProtocol::Json => JsonResponse::from_graphql(response).into(),
Expand Down

0 comments on commit 66947c4

Please sign in to comment.