Skip to content

Commit

Permalink
WIP: js sql-query-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jul 24, 2023
1 parent 1548dc1 commit 80b099d
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 228 deletions.
1 change: 1 addition & 0 deletions psl/builtin-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use mongodb::MongoDbType;
pub use mssql_datamodel_connector::{MsSqlType, MsSqlTypeParameter};
pub use mysql_datamodel_connector::MySqlType;
pub use postgres_datamodel_connector::{PostgresDatasourceProperties, PostgresType};
pub use psl_core::js_connector::JsConnector;

mod mongodb;
mod mssql_datamodel_connector;
Expand Down
9 changes: 9 additions & 0 deletions psl/psl-core/src/js_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ pub struct JsConnector {
pub allowed_protocols: Option<&'static [&'static str]>,
}

impl JsConnector {
/// Returns true if the given name is a valid provider name for a JsConnector.
/// We use the convention that if a provider starts with ´@prisma/´ (ex. ´@prisma/planetscale´)
/// then its a provider for a JS connector.
pub fn is_provider(name: &str) -> bool {
name.starts_with("@prisma/")
}
}

#[derive(Copy, Clone)]
pub enum Flavor {
MySQL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) struct SqlConnection<C> {

impl<C> SqlConnection<C>
where
C: Queryable + TransactionCapable + Send + Sync + 'static,
C: TransactionCapable + Send + Sync + 'static,
{
pub fn new(inner: C, connection_info: &ConnectionInfo, features: psl::PreviewFeatures) -> Self {
let connection_info = connection_info.clone();
Expand Down
191 changes: 191 additions & 0 deletions query-engine/connectors/sql-query-connector/src/database/js.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use super::connection::SqlConnection;
use crate::FromSource;
use async_trait::async_trait;
use connector_interface::{
self as connector,
error::{ConnectorError, ErrorKind},
Connection, Connector,
};

use once_cell::sync::Lazy;
use quaint::{
connector::IsolationLevel,
prelude::{Queryable as QuaintQueryable, *},
};
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
};

/// Registry is the type for the global registry of Js connectors.
type Registry = HashMap<String, JsTransactionCapable>;

/// REGISTRY is the global registry of JsConnectors
static REGISTRY: Lazy<Mutex<Registry>> = Lazy::new(|| Mutex::new(HashMap::new()));

pub fn registered_js_connector(provider: &str) -> connector::Result<JsTransactionCapable> {
let lock = REGISTRY.lock().unwrap();
lock.get(provider)
.ok_or(ConnectorError::from_kind(ErrorKind::UnsupportedConnector(format!(
"A Javascript connector proxy for {} was not registered",
provider
))))
.map(|conn_ref| conn_ref.to_owned())
}

pub fn register_js_connector(provider: &str, connector: Arc<dyn QuaintQueryable>) -> Result<(), String> {
let mut lock = REGISTRY.lock().unwrap();
let entry = lock.entry(provider.to_string());
match entry {
Entry::Occupied(_) => Err(format!(
"A Javascript connector proxy for {} was already registered, skipping",
provider
)),
Entry::Vacant(v) => {
v.insert(JsTransactionCapable { queryable: connector });
Ok(())
}
}
}

pub struct Js {
queryable: JsTransactionCapable,
connection_info: ConnectionInfo,
features: psl::PreviewFeatures,
psl_connector: psl::builtin_connectors::JsConnector,
}

fn get_connection_info(url: &str) -> connector::Result<ConnectionInfo> {
let database_str = url;

let connection_info = ConnectionInfo::from_url(database_str).map_err(|err| {
ConnectorError::from_kind(ErrorKind::InvalidDatabaseUrl {
details: err.to_string(),
url: database_str.to_string(),
})
})?;

Ok(connection_info)
}

#[async_trait]
impl FromSource for Js {
async fn from_source(
source: &psl::Datasource,
url: &str,
features: psl::PreviewFeatures,
) -> connector_interface::Result<Js> {
if let Some(psl_connector) = source.active_connector.as_js_connector() {
let driver = registered_js_connector(source.active_provider)?;
let connection_info = get_connection_info(url)?;

return Ok(Js {
queryable: driver.clone(),
connection_info,
features: features.to_owned(),
psl_connector: psl_connector.clone(),
});
} else {
panic!(
"Connector for provider {} is not a JsConnector",
source.active_connector.provider_name()
)
}
}
}

#[async_trait]
impl Connector for Js {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
let runtime_conn = self.queryable.clone();
let sql_conn = SqlConnection::new(runtime_conn, &self.connection_info, self.features);
Ok(Box::new(sql_conn) as Box<dyn Connection + Send + Sync + 'static>)
})
.await
}

fn name(&self) -> &'static str {
self.psl_connector.provider_name
}

fn should_retry_on_transient_error(&self) -> bool {
false
}
}

// TODO: miguelff: I haven´t found a better way to do this, yet... please continue reading.
//
// There is a bug in NAPI-rs by wich compiling a binary crate that links code using napi-rs
// bindings breaks. We could have used a JsQueryable from the `js-connectors` crate directly, as the
// `connection` field of a `Js` connector, but that will imply using napi-rs transitively, and break
// the tests (which are compiled as binary creates)
//
// To avoid the problem above I separated interface from implementation, making JsConnector
// independent on napi-rs. Initially, I tried having a field Arc<&dyn TransactionCabable> to hold
// JsQueryable at runtime. I did this, because TransactionCapable is the trait bounds required to
// create a value of `SqlConnection` (see [SqlConnection::new])) to actually performt the queries.
// using JSQueryable. However, this didn't work because TransactionCapable is not object safe.
// (has Sized as a supertrait)
//
// The thing is that TransactionCapable is not object safe and cannot be used in a dynamic type
// declaration, so finally I couldn't come up with anything better then wrapping a QuaintQueryable
// in this object, and implementing TransactionCapable (and quaint::Queryable) explicitly for it.
#[derive(Clone)]
pub struct JsTransactionCapable {
queryable: Arc<dyn QuaintQueryable>,
}

#[async_trait]
impl QuaintQueryable for JsTransactionCapable {
async fn query(&self, q: Query<'_>) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query(q).await
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query_raw(sql, params).await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query_raw_typed(sql, params).await
}

async fn execute(&self, q: Query<'_>) -> quaint::Result<u64> {
self.queryable.execute(q).await
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.queryable.execute_raw(sql, params).await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.queryable.execute_raw_typed(sql, params).await
}

/// Run a command in the database, for queries that can't be run using
/// prepared statements.
async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> {
self.queryable.raw_cmd(cmd).await
}

async fn version(&self) -> quaint::Result<Option<String>> {
self.queryable.version().await
}

fn is_healthy(&self) -> bool {
self.queryable.is_healthy()
}

/// Sets the transaction isolation level to given value.
/// Implementers have to make sure that the passed isolation level is valid for the underlying database.
async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> {
self.queryable.set_tx_isolation_level(isolation_level).await
}

/// Signals if the isolation level SET needs to happen before or after the tx BEGIN.
fn requires_isolation_first(&self) -> bool {
self.queryable.requires_isolation_first()
}
}

impl TransactionCapable for JsTransactionCapable {}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#[cfg(feature = "js-connectors")]
pub mod js;
mod runtime;

mod connection;
#[cfg(feature = "js-connectors")]
mod js;
mod mssql;
mod mysql;
mod postgresql;
Expand All @@ -14,6 +12,8 @@ pub(crate) mod operations;
use async_trait::async_trait;
use connector_interface::{error::ConnectorError, Connector};

#[cfg(feature = "js-connectors")]
pub use js::*;
pub use mssql::*;
pub use mysql::*;
pub use postgresql::*;
Expand Down
34 changes: 4 additions & 30 deletions query-engine/connectors/sql-query-connector/src/database/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::connection::SqlConnection;
use super::runtime::RuntimePool;
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand All @@ -11,7 +10,7 @@ use quaint::{pooled::Quaint, prelude::ConnectionInfo};
use std::time::Duration;

pub struct Mysql {
pool: RuntimePool,
pool: Quaint,
connection_info: ConnectionInfo,
features: psl::PreviewFeatures,
}
Expand Down Expand Up @@ -39,31 +38,10 @@ fn get_connection_info(url: &str) -> connector::Result<ConnectionInfo> {
#[async_trait]
impl FromSource for Mysql {
async fn from_source(
source: &psl::Datasource,
_: &psl::Datasource,
url: &str,
features: psl::PreviewFeatures,
) -> connector_interface::Result<Mysql> {
if source.provider == "@prisma/mysql" {
#[cfg(feature = "js-connectors")]
{
let driver = super::js::registered_driver();
let connection_info = get_connection_info(url)?;

return Ok(Mysql {
pool: RuntimePool::Js(driver.unwrap().clone()),
connection_info,
features: features.to_owned(),
});
}

#[cfg(not(feature = "js-connectors"))]
{
return Err(ConnectorError::from_kind(ErrorKind::UnsupportedConnector(
"The @prisma/mysql connector requires the `jsConnectors` preview feature to be enabled.".into(),
)));
}
}

let connection_info = get_connection_info(url)?;

let mut builder = Quaint::builder(url)
Expand All @@ -77,7 +55,7 @@ impl FromSource for Mysql {
let connection_info = pool.connection_info().to_owned();

Ok(Mysql {
pool: RuntimePool::Rust(pool),
pool,
connection_info,
features: features.to_owned(),
})
Expand All @@ -99,11 +77,7 @@ impl Connector for Mysql {
}

fn name(&self) -> &'static str {
if self.pool.is_nodejs() {
"@prisma/mysql"
} else {
"mysql"
}
"mysql"
}

fn should_retry_on_transient_error(&self) -> bool {
Expand Down
Loading

0 comments on commit 80b099d

Please sign in to comment.