Skip to content

Commit

Permalink
qe: Introduce JS transaction objects (#4138)
Browse files Browse the repository at this point in the history
* qe: Introduce JS transaction objects (engine only)

Part of the fix for prisma/team-orm#269.
Instead of tracking TX status as a flag on a driver instance (that does
not allow to run 2 transactions in parallel, or transactional code in
parallel with non-transactional), each time transaction is started we
create new, independent `Transaction` object. Multiple `Transactions`
can co-exist and execute quries independently from one another.

This PR implements Rust part of the change only. Merging it without TS
part will break all transactions instead of fixing race condition.

Summary of the changes:
- converts quaint's `Transaction` into a trait and makes
  `TransactionCapable` return `dyn Transaction`.
- for native drivers, exactly same `Transaction` implementation is used.
- for JS drivers, `JsTransaction` struct is implemented. It delegates
  all it's functionality (including commit and rollaback) to JS side.
- `Proxy` struct is split into 3: `CommonProxy` for methods common
  between transaction and the driver, `DriverProxy` for driver-specific
  ones and `TransactionProxy` for transaction-specific ones.
- Similary, `JsBaseQueryable` is a queryable over either transaction or
  driver while `JsQueryable` now means driver only. `JsBaseQueryable`
  is used by both `JsTransaction` and `JsQueryable` internally.

* Clippy

* use `dyn Transaction` in `server_reset_query`

* Remove duplicate trait comments as per review

* feat: exclude `version()` and `is_healthy` from `js-connectors`'s `Queryable` (#4139)

* feat(js-connectors): [rust] remove implementations of "version()" and "is_healthy()"

* feat(js-connectors): [typescript] remove implementations of "version()" and "is_healthy()"

* chore: remove dead code

* JS implementation for planetscale

* Correct way of handling rollbacks

* Add neon implementation

* Add test & fix planetscale isolation levels

* Post rebase fixes

* Update query-engine/js-connectors/js/neon-js-connector/src/neon.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update query-engine/js-connectors/js/neon-js-connector/src/neon.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update query-engine/js-connectors/js/js-connector-utils/src/binder.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update query-engine/js-connectors/js/js-connector-utils/src/binder.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update query-engine/js-connectors/js/js-connector-utils/src/types.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>

* Update neon

---------

Co-authored-by: Alberto Schiabel <jkomyno@users.noreply.github.com>
  • Loading branch information
SevInf and jkomyno authored Aug 18, 2023
1 parent 1071f46 commit 0bc8ee6
Show file tree
Hide file tree
Showing 28 changed files with 664 additions and 554 deletions.
13 changes: 9 additions & 4 deletions quaint/src/connector/mssql.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod conversion;
mod error;

use super::{IsolationLevel, TransactionOptions};
use super::{IsolationLevel, Transaction, TransactionOptions};
use crate::{
ast::{Query, Value},
connector::{metrics, queryable::*, ResultSet, Transaction},
connector::{metrics, queryable::*, DefaultTransaction, ResultSet},
error::{Error, ErrorKind},
visitor::{self, Visitor},
};
Expand Down Expand Up @@ -96,7 +96,10 @@ static SQL_SERVER_DEFAULT_ISOLATION: IsolationLevel = IsolationLevel::ReadCommit

#[async_trait]
impl TransactionCapable for Mssql {
async fn start_transaction(&self, isolation: Option<IsolationLevel>) -> crate::Result<Transaction<'_>> {
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> crate::Result<Box<dyn Transaction + 'a>> {
// Isolation levels in SQL Server are set on the connection and live until they're changed.
// Always explicitly setting the isolation level each time a tx is started (either to the given value
// or by using the default/connection string value) prevents transactions started on connections from
Expand All @@ -107,7 +110,9 @@ impl TransactionCapable for Mssql {

let opts = TransactionOptions::new(isolation, self.requires_isolation_first());

Transaction::new(self, self.begin_statement(), opts).await
Ok(Box::new(
DefaultTransaction::new(self, self.begin_statement(), opts).await?,
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion quaint/src/connector/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl Mysql {
}
}

impl TransactionCapable for Mysql {}
impl_default_TransactionCapable!(Mysql);

#[async_trait]
impl Queryable for Mysql {
Expand Down
8 changes: 4 additions & 4 deletions quaint/src/connector/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod error;

use crate::{
ast::{Query, Value},
connector::{metrics, queryable::*, ResultSet, Transaction},
connector::{metrics, queryable::*, ResultSet},
error::{Error, ErrorKind},
visitor::{self, Visitor},
};
Expand Down Expand Up @@ -34,7 +34,7 @@ pub(crate) const DEFAULT_SCHEMA: &str = "public";
#[cfg(feature = "expose-drivers")]
pub use tokio_postgres;

use super::IsolationLevel;
use super::{IsolationLevel, Transaction};

#[derive(Clone)]
struct Hidden<T>(T);
Expand Down Expand Up @@ -765,7 +765,7 @@ impl Display for SetSearchPath<'_> {
}
}

impl TransactionCapable for PostgreSql {}
impl_default_TransactionCapable!(PostgreSql);

#[async_trait]
impl Queryable for PostgreSql {
Expand Down Expand Up @@ -912,7 +912,7 @@ impl Queryable for PostgreSql {
self.is_healthy.load(Ordering::SeqCst)
}

async fn server_reset_query(&self, tx: &Transaction<'_>) -> crate::Result<()> {
async fn server_reset_query(&self, tx: &dyn Transaction) -> crate::Result<()> {
if self.pg_bouncer {
tx.raw_cmd("DEALLOCATE ALL").await
} else {
Expand Down
37 changes: 27 additions & 10 deletions quaint/src/connector/queryable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{IsolationLevel, ResultSet, Transaction, TransactionOptions};
use super::{IsolationLevel, ResultSet, Transaction};
use crate::ast::*;
use async_trait::async_trait;

Expand Down Expand Up @@ -82,7 +82,7 @@ pub trait Queryable: Send + Sync {
}

/// Execute an arbitrary function in the beginning of each transaction.
async fn server_reset_query(&self, _: &Transaction<'_>) -> crate::Result<()> {
async fn server_reset_query(&self, _: &dyn Transaction) -> crate::Result<()> {
Ok(())
}

Expand All @@ -101,13 +101,30 @@ pub trait Queryable: Send + Sync {

/// A thing that can start a new transaction.
#[async_trait]
pub trait TransactionCapable: Queryable
where
Self: Sized,
{
pub trait TransactionCapable: Queryable {
/// Starts a new transaction
async fn start_transaction(&self, isolation: Option<IsolationLevel>) -> crate::Result<Transaction<'_>> {
let opts = TransactionOptions::new(isolation, self.requires_isolation_first());
Transaction::new(self, self.begin_statement(), opts).await
}
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> crate::Result<Box<dyn Transaction + 'a>>;
}

macro_rules! impl_default_TransactionCapable {
($t:ty) => {
#[async_trait]
impl TransactionCapable for $t {
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> crate::Result<Box<dyn crate::connector::Transaction + 'a>> {
let opts = crate::connector::TransactionOptions::new(isolation, self.requires_isolation_first());

Ok(Box::new(
crate::connector::DefaultTransaction::new(self, self.begin_statement(), opts).await?,
))
}
}
};
}

pub(crate) use impl_default_TransactionCapable;
2 changes: 1 addition & 1 deletion quaint/src/connector/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Sqlite {
}
}

impl TransactionCapable for Sqlite {}
impl_default_TransactionCapable!(Sqlite);

#[async_trait]
impl Queryable for Sqlite {
Expand Down
37 changes: 28 additions & 9 deletions quaint/src/connector/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ use std::{fmt, str::FromStr};

extern crate metrics as metrics;

#[async_trait]
pub trait Transaction: Queryable {
/// Commit the changes to the database and consume the transaction.
async fn commit(&self) -> crate::Result<()>;

/// Rolls back the changes to the database.
async fn rollback(&self) -> crate::Result<()>;

/// workaround for lack of upcasting between traits https://github.com/rust-lang/rust/issues/65991
fn as_queryable(&self) -> &dyn Queryable;
}

pub(crate) struct TransactionOptions {
/// The isolation level to use.
pub(crate) isolation_level: Option<IsolationLevel>,
Expand All @@ -17,21 +29,21 @@ pub(crate) struct TransactionOptions {
pub(crate) isolation_first: bool,
}

/// A representation of an SQL database transaction. If not commited, a
/// A default representation of an SQL database transaction. If not commited, a
/// transaction will be rolled back by default when dropped.
///
/// Currently does not support nesting, so starting a new transaction using the
/// transaction object will panic.
pub struct Transaction<'a> {
pub(crate) inner: &'a dyn Queryable,
pub struct DefaultTransaction<'a> {
pub inner: &'a dyn Queryable,
}

impl<'a> Transaction<'a> {
impl<'a> DefaultTransaction<'a> {
pub(crate) async fn new(
inner: &'a dyn Queryable,
begin_stmt: &str,
tx_opts: TransactionOptions,
) -> crate::Result<Transaction<'a>> {
) -> crate::Result<DefaultTransaction<'a>> {
let this = Self { inner };

if tx_opts.isolation_first {
Expand All @@ -53,26 +65,33 @@ impl<'a> Transaction<'a> {
increment_gauge!("prisma_client_queries_active", 1.0);
Ok(this)
}
}

#[async_trait]
impl<'a> Transaction for DefaultTransaction<'a> {
/// Commit the changes to the database and consume the transaction.
pub async fn commit(&self) -> crate::Result<()> {
async fn commit(&self) -> crate::Result<()> {
decrement_gauge!("prisma_client_queries_active", 1.0);
self.inner.raw_cmd("COMMIT").await?;

Ok(())
}

/// Rolls back the changes to the database.
pub async fn rollback(&self) -> crate::Result<()> {
async fn rollback(&self) -> crate::Result<()> {
decrement_gauge!("prisma_client_queries_active", 1.0);
self.inner.raw_cmd("ROLLBACK").await?;

Ok(())
}

fn as_queryable(&self) -> &dyn Queryable {
self
}
}

#[async_trait]
impl<'a> Queryable for Transaction<'a> {
impl<'a> Queryable for DefaultTransaction<'a> {
async fn query(&self, q: Query<'_>) -> crate::Result<ResultSet> {
self.inner.query(q).await
}
Expand Down Expand Up @@ -171,7 +190,7 @@ impl FromStr for IsolationLevel {
}
}
impl TransactionOptions {
pub(crate) fn new(isolation_level: Option<IsolationLevel>, isolation_first: bool) -> Self {
pub fn new(isolation_level: Option<IsolationLevel>, isolation_first: bool) -> Self {
Self {
isolation_level,
isolation_first,
Expand Down
6 changes: 3 additions & 3 deletions quaint/src/pooled/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::connector::MysqlUrl;
use crate::connector::PostgresUrl;
use crate::{
ast,
connector::{self, IsolationLevel, Queryable, Transaction, TransactionCapable},
connector::{self, impl_default_TransactionCapable, IsolationLevel, Queryable, Transaction, TransactionCapable},
error::Error,
};
use async_trait::async_trait;
Expand All @@ -18,7 +18,7 @@ pub struct PooledConnection {
pub(crate) inner: MobcPooled<QuaintManager>,
}

impl TransactionCapable for PooledConnection {}
impl_default_TransactionCapable!(PooledConnection);

#[async_trait]
impl Queryable for PooledConnection {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl Queryable for PooledConnection {
self.inner.is_healthy()
}

async fn server_reset_query(&self, tx: &Transaction<'_>) -> crate::Result<()> {
async fn server_reset_query(&self, tx: &dyn Transaction) -> crate::Result<()> {
self.inner.server_reset_query(tx).await
}

Expand Down
2 changes: 1 addition & 1 deletion quaint/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A "prelude" for users of the `quaint` crate.
pub use crate::ast::*;
pub use crate::connector::{
ConnectionInfo, Queryable, ResultRow, ResultSet, SqlFamily, Transaction, TransactionCapable,
ConnectionInfo, DefaultTransaction, Queryable, ResultRow, ResultSet, SqlFamily, TransactionCapable,
};
pub use crate::{col, val, values};
4 changes: 2 additions & 2 deletions quaint/src/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::connector::DEFAULT_SQLITE_SCHEMA_NAME;
use crate::{
ast,
connector::{self, ConnectionInfo, IsolationLevel, Queryable, TransactionCapable},
connector::{self, impl_default_TransactionCapable, ConnectionInfo, IsolationLevel, Queryable, TransactionCapable},
};
use async_trait::async_trait;
use std::{fmt, sync::Arc};
Expand All @@ -25,7 +25,7 @@ impl fmt::Debug for Quaint {
}
}

impl TransactionCapable for Quaint {}
impl_default_TransactionCapable!(Quaint);

impl Quaint {
/// Create a new connection to the database. The connection string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
let fut_tx = self.inner.start_transaction(isolation_level);

catch(self.connection_info.clone(), async move {
let tx: quaint::connector::Transaction = fut_tx.await.map_err(SqlError::from)?;
let tx = fut_tx.await.map_err(SqlError::from)?;

Ok(Box::new(SqlConnectorTransaction::new(tx, connection_info, features)) as Box<dyn Transaction>)
})
Expand Down
16 changes: 12 additions & 4 deletions query-engine/connectors/sql-query-connector/src/database/js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use connector_interface::{
};
use once_cell::sync::Lazy;
use quaint::{
connector::IsolationLevel,
connector::{IsolationLevel, Transaction},
prelude::{Queryable as QuaintQueryable, *},
};
use std::{
Expand All @@ -32,7 +32,7 @@ fn registered_js_connector(provider: &str) -> connector::Result<JsConnector> {
.map(|conn_ref| conn_ref.to_owned())
}

pub fn register_js_connector(provider: &str, connector: Arc<dyn QuaintQueryable>) -> Result<(), String> {
pub fn register_js_connector(provider: &str, connector: Arc<dyn TransactionCapable>) -> Result<(), String> {
let mut lock = REGISTRY.lock().unwrap();
let entry = lock.entry(provider.to_string());
match entry {
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Connector for Js {
// in this object, and implementing TransactionCapable (and quaint::Queryable) explicitly for it.
#[derive(Clone)]
struct JsConnector {
connector: Arc<dyn QuaintQueryable>,
connector: Arc<dyn TransactionCapable>,
}

#[async_trait]
Expand Down Expand Up @@ -183,4 +183,12 @@ impl QuaintQueryable for JsConnector {
}
}

impl TransactionCapable for JsConnector {}
#[async_trait]
impl TransactionCapable for JsConnector {
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> quaint::Result<Box<dyn Transaction + 'a>> {
self.connector.start_transaction(isolation).await
}
}
Loading

0 comments on commit 0bc8ee6

Please sign in to comment.