Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/postgres/transaction/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ workspace = "../../../"
[dependencies]
sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio-native-tls" ] }
futures = "0.3.1"
tokio = { version = "1.20.0", features = ["macros"]}
tokio = { version = "1.20.0", features = ["macros", "rt-multi-thread"]}
47 changes: 45 additions & 2 deletions examples/postgres/transaction/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use sqlx::query;
use sqlx::Acquire;
use sqlx::Connection;

async fn insert_and_verify(
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
test_id: i64,
) -> Result<(), Box<dyn std::error::Error>> {
let connection = transaction.acquire().await?;
query!(
r#"INSERT INTO todos (id, description)
VALUES ( $1, $2 )
"#,
test_id,
"test todo"
)
.execute(&mut *transaction)
.execute(&mut *connection)
.await?;

// check that inserted todo can be fetched inside the uncommitted transaction
let _ = query!(r#"SELECT FROM todos WHERE id = $1"#, test_id)
.fetch_one(transaction)
.fetch_one(&mut *connection)
.await?;

Ok(())
Expand Down Expand Up @@ -60,6 +63,33 @@ async fn commit_example(
Ok(())
}

async fn insert_and_update_description(
pool: &sqlx::PgPool,
test_id: i64,
new_description: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut connection = pool.acquire().await?;

connection
.transaction(|tx| {
Box::pin(async {
insert_and_verify(tx, test_id).await.unwrap();
let conn = tx.acquire().await?;
query!(
r#"UPDATE todos set description = $1 where id = $2"#,
new_description,
test_id,
)
.execute(conn)
.await?;
Ok::<_, sqlx::Error>(())
})
})
.await?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conn_str =
Expand Down Expand Up @@ -100,5 +130,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

assert!(inserted_todo.is_ok());

let test_id = 2;

let description = String::from("hello world");
insert_and_update_description(&pool, test_id, &description).await?;

// check that inserted todo is visible outside the transaction after commit
let inserted_todo: Result<_, _> = query!(r#"SELECT * FROM todos WHERE id = $1"#, test_id)
.fetch_one(&pool)
.await;

assert!(inserted_todo.is_ok());
assert_eq!(inserted_todo.unwrap().description, description);

Ok(())
}
1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ once_cell = "1.9.0"
percent-encoding = "2.1.0"
regex = { version = "1.5.5", optional = true }
rsa = { version = "0.8.0", optional = true }
scoped-futures = { version = "0.1.3", features = ["std"] }
serde = { version = "1.0.132", features = ["derive", "rc"], optional = true }
serde_json = { version = "1.0.73", features = ["raw_value"], optional = true }
sha1 = { version = "0.10.1", default-features = false, optional = true }
Expand Down
27 changes: 18 additions & 9 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::error::Error;
use crate::transaction::Transaction;
use futures_core::future::BoxFuture;
use log::LevelFilter;
use scoped_futures::ScopedBoxFuture;
use std::fmt::Debug;
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -60,21 +61,29 @@ pub trait Connection: Send {
/// use sqlx::postgres::{PgConnection, PgRow};
/// use sqlx::Connection;
///
/// # pub async fn _f(conn: &mut PgConnection) -> sqlx::Result<Vec<PgRow>> {
/// conn.transaction(|txn| Box::pin(async move {
/// sqlx::query("select * from ..").fetch_all(&mut **txn).await
/// })).await
/// # pub async fn _f<'a>(conn: &mut PgConnection, foo: &'a str) -> sqlx::Result<Vec<PgRow>> {
/// conn.transaction(|txn| Box::pin(async move {
/// println!("{foo}");
/// sqlx::query("select * from ..").fetch_all(&mut **txn).await
/// })).await
/// # }
/// ```
fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> BoxFuture<'a, Result<R, E>>
fn transaction<'a, 'b, 'fut, F, R, E>(
&'a mut self,
callback: F,
) -> BoxFuture<'fut, Result<R, E>>
where
for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result<R, E>>
+ 'a
for<'c> F: FnOnce(
&'c mut Transaction<'fut, Self::Database>,
) -> ScopedBoxFuture<'b, 'c, Result<R, E>>
+ 'b
+ Send
+ Sync,
Self: Sized,
R: Send,
E: From<Error> + Send,
R: Send + 'fut,
E: From<Error> + Send + 'fut,
'a: 'fut,
'b: 'fut,
{
Box::pin(async move {
let mut transaction = self.begin().await?;
Expand Down