Skip to content

Commit

Permalink
Expose the Sqlite immediate_transaction and exclusive_transaction
Browse files Browse the repository at this point in the history
… functions from the sync connection wrapper
  • Loading branch information
weiznich committed Jul 19, 2024
1 parent 6437e59 commit 622fa21
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 3 deletions.
11 changes: 10 additions & 1 deletion src/doctest_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ cfg_if::cfg_if! {
accent VARCHAR(255) DEFAULT 'Blue'
)").execute(connection).await.unwrap();

connection.begin_test_transaction().await.unwrap();
diesel::sql_query("INSERT INTO users (name) VALUES ('Sean'), ('Tess')").execute(connection).await.unwrap();
diesel::sql_query("INSERT INTO posts (user_id, title) VALUES
(1, 'My first post'),
Expand All @@ -231,12 +230,22 @@ cfg_if::cfg_if! {

#[allow(dead_code)]
async fn establish_connection() -> SyncConnectionWrapper<SqliteConnection> {
use diesel_async::AsyncConnection;

let mut connection = connection_no_data().await;
connection.begin_test_transaction().await.unwrap();
create_tables(&mut connection).await;
connection
}

async fn connection_no_transaction() -> SyncConnectionWrapper<SqliteConnection> {
use diesel_async::AsyncConnection;

let mut connection = SyncConnectionWrapper::<SqliteConnection>::establish(":memory:").await.unwrap();
create_tables(&mut connection).await;
connection
}

} else {
compile_error!(
"At least one backend must be used to test this crate.\n \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use tokio::task::JoinError;

#[cfg(feature = "sqlite")]
mod sqlite;

fn from_tokio_join_error(join_error: JoinError) -> diesel::result::Error {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UnableToSendCommand,
Expand All @@ -48,7 +51,7 @@ fn from_tokio_join_error(join_error: JoinError) -> diesel::result::Error {
/// # Examples
///
/// ```rust
/// # include!("doctest_setup.rs");
/// # include!("../doctest_setup.rs");
/// use diesel_async::RunQueryDsl;
/// use schema::users;
///
Expand Down Expand Up @@ -232,7 +235,33 @@ impl<C> SyncConnectionWrapper<C> {
}
}

pub(self) fn spawn_blocking<'a, R>(
/// Run a operation directly with the inner connection
///
/// This function is usful to register custom functions
/// and collection for Sqlite for example
///
/// # Example
///
/// ```rust
/// # include!("../doctest_setup.rs");
/// # #[tokio::main]
/// # async fn main() {
/// # run_test().await.unwrap();
/// # }
/// #
/// # async fn run_test() -> QueryResult<()> {
/// # let mut conn = establish_connection().await;
/// conn.spawn_blocking(|conn| {
/// // sqlite.rs sqlite NOCASE only works for ASCII characters,
/// // this collation allows handling UTF-8 (barring locale differences)
/// conn.register_collation("RUSTNOCASE", |rhs, lhs| {
/// rhs.to_lowercase().cmp(&lhs.to_lowercase())
/// })
/// }).await
///
/// # }
/// ```
pub fn spawn_blocking<'a, R>(
&mut self,
task: impl FnOnce(&mut C) -> QueryResult<R> + Send + 'static,
) -> BoxFuture<'a, QueryResult<R>>
Expand Down
129 changes: 129 additions & 0 deletions src/sync_connection_wrapper/sqlite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use diesel::connection::AnsiTransactionManager;
use diesel::SqliteConnection;
use scoped_futures::ScopedBoxFuture;

use crate::sync_connection_wrapper::SyncTransactionManagerWrapper;
use crate::TransactionManager;

use super::SyncConnectionWrapper;

impl SyncConnectionWrapper<SqliteConnection> {
/// Run a transaction with `BEGIN IMMEDIATE`
///
/// This method will return an error if a transaction is already open.
///
/// **WARNING:** Canceling the returned future does currently **not**
/// close an already open transaction. You may end up with a connection
/// containing a dangling transaction.
///
/// # Example
///
/// ```rust
/// # include!("../doctest_setup.rs");
/// use diesel::result::Error;
/// use scoped_futures::ScopedFutureExt;
/// use diesel_async::{RunQueryDsl, AsyncConnection};
/// #
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// # run_test().await.unwrap();
/// # }
/// #
/// # async fn run_test() -> QueryResult<()> {
/// # use schema::users::dsl::*;
/// # let conn = &mut connection_no_transaction().await;
/// conn.immediate_transaction(|conn| async move {
/// diesel::insert_into(users)
/// .values(name.eq("Ruby"))
/// .execute(conn)
/// .await?;
///
/// let all_names = users.select(name).load::<String>(conn).await?;
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
///
/// Ok(())
/// }.scope_boxed()).await
/// # }
/// ```
pub async fn immediate_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
where
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<diesel::result::Error> + Send + 'a,
R: Send + 'a,
{
self.transaction_sql(f, "BEGIN IMMEDIATE").await
}

/// Run a transaction with `BEGIN EXCLUSIVE`
///
/// This method will return an error if a transaction is already open.
///
/// **WARNING:** Canceling the returned future does currently **not**
/// close an already open transaction. You may end up with a connection
/// containing a dangling transaction.
///
/// # Example
///
/// ```rust
/// # include!("../doctest_setup.rs");
/// use diesel::result::Error;
/// use scoped_futures::ScopedFutureExt;
/// use diesel_async::{RunQueryDsl, AsyncConnection};
/// #
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// # run_test().await.unwrap();
/// # }
/// #
/// # async fn run_test() -> QueryResult<()> {
/// # use schema::users::dsl::*;
/// # let conn = &mut connection_no_transaction().await;
/// conn.exclusive_transaction(|conn| async move {
/// diesel::insert_into(users)
/// .values(name.eq("Ruby"))
/// .execute(conn)
/// .await?;
///
/// let all_names = users.select(name).load::<String>(conn).await?;
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
///
/// Ok(())
/// }.scope_boxed()).await
/// # }
/// ```
pub async fn exclusive_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
where
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<diesel::result::Error> + Send + 'a,
R: Send + 'a,
{
self.transaction_sql(f, "BEGIN EXCLUSIVE").await
}

async fn transaction_sql<'a, R, E, F>(&mut self, f: F, sql: &'static str) -> Result<R, E>
where
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
E: From<diesel::result::Error> + Send + 'a,
R: Send + 'a,
{
self.spawn_blocking(|conn| AnsiTransactionManager::begin_transaction_sql(conn, sql))
.await?;

match f(&mut *self).await {
Ok(value) => {
SyncTransactionManagerWrapper::<AnsiTransactionManager>::commit_transaction(
&mut *self,
)
.await?;
Ok(value)
}
Err(e) => {
SyncTransactionManagerWrapper::<AnsiTransactionManager>::rollback_transaction(
&mut *self,
)
.await?;
Err(e)
}
}
}
}

0 comments on commit 622fa21

Please sign in to comment.