Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/independent decoding #149

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 135 additions & 36 deletions fang/src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,35 @@ use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use cron::Schedule;
//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7
use sqlx::any::AnyConnectOptions;
use sqlx::any::AnyKind;
#[cfg(any(
feature = "asynk-postgres",
feature = "asynk-mysql",
feature = "asynk-sqlite"
))]
use sqlx::pool::PoolOptions;
use sqlx::Any;
use sqlx::AnyPool;
use sqlx::Pool;
//use sqlx::any::install_default_drivers; // this is supported in sqlx 0.7
use std::str::FromStr;
use thiserror::Error;
use typed_builder::TypedBuilder;
use uuid::Uuid;

#[cfg(feature = "asynk-postgres")]
use sqlx::PgPool;
#[cfg(feature = "asynk-postgres")]
use sqlx::Postgres;

#[cfg(feature = "asynk-mysql")]
use sqlx::MySql;
#[cfg(feature = "asynk-mysql")]
use sqlx::MySqlPool;

#[cfg(feature = "asynk-sqlite")]
use sqlx::Sqlite;
#[cfg(feature = "asynk-sqlite")]
use sqlx::SqlitePool;

#[cfg(test)]
use self::async_queue_tests::test_asynk_queue;

Expand Down Expand Up @@ -134,11 +152,51 @@ pub trait AsyncQueueable: Send {
/// .build();
/// ```
///
///

#[derive(Debug, Clone)]
pub(crate) enum InternalPool {
#[cfg(feature = "asynk-postgres")]
Pg(PgPool),
#[cfg(feature = "asynk-mysql")]
MySql(MySqlPool),
#[cfg(feature = "asynk-sqlite")]
Sqlite(SqlitePool),
}

impl InternalPool {
#[cfg(feature = "asynk-postgres")]
pub(crate) fn unwrap_pg_pool(&self) -> &PgPool {
match self {
InternalPool::Pg(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a PgPool!"),
}
}

#[cfg(feature = "asynk-mysql")]
pub(crate) fn unwrap_mysql_pool(&self) -> &MySqlPool {
match self {
InternalPool::MySql(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a MySqlPool!"),
}
}

#[cfg(feature = "asynk-sqlite")]
pub(crate) fn unwrap_sqlite_pool(&self) -> &SqlitePool {
match self {
InternalPool::Sqlite(pool) => pool,
#[allow(unreachable_patterns)]
_ => panic!("Not a SqlitePool!"),
}
}
}

#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue {
#[builder(default=None, setter(skip))]
pool: Option<AnyPool>,
pool: Option<InternalPool>,
#[builder(setter(into))]
uri: String,
#[builder(setter(into))]
Expand All @@ -152,36 +210,61 @@ pub struct AsyncQueue {
#[cfg(test)]
use tokio::sync::Mutex;

#[cfg(test)]
#[cfg(all(test, feature = "asynk-postgres"))]
static ASYNC_QUEUE_POSTGRES_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
#[cfg(all(test, feature = "asynk-sqlite"))]
static ASYNC_QUEUE_SQLITE_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
#[cfg(all(test, feature = "asynk-mysql"))]
static ASYNC_QUEUE_MYSQL_TEST_COUNTER: Mutex<u32> = Mutex::const_new(0);

#[cfg(test)]
use sqlx::Executor;

#[cfg(test)]
#[cfg(all(test, feature = "asynk-sqlite"))]
use std::path::Path;

#[cfg(test)]
use std::env;

use super::backend_sqlx::BackendSqlX;

fn get_backend(_anykind: AnyKind) -> BackendSqlX {
match _anykind {
async fn get_backend(
kind: AnyKind,
_uri: &str,
_max_connections: u32,
) -> Result<(BackendSqlX, InternalPool), AsyncQueueError> {
match kind {
#[cfg(feature = "asynk-postgres")]
AnyKind::Postgres => BackendSqlX::Pg,
AnyKind::Postgres => {
let pool = PoolOptions::<Postgres>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::Pg, InternalPool::Pg(pool)))
}
#[cfg(feature = "asynk-mysql")]
AnyKind::MySql => BackendSqlX::MySql,
AnyKind::MySql => {
let pool = PoolOptions::<MySql>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::MySql, InternalPool::MySql(pool)))
}
#[cfg(feature = "asynk-sqlite")]
AnyKind::Sqlite => BackendSqlX::Sqlite,
AnyKind::Sqlite => {
let pool = PoolOptions::<Sqlite>::new()
.max_connections(_max_connections)
.connect(_uri)
.await?;

Ok((BackendSqlX::Sqlite, InternalPool::Sqlite(pool)))
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
_ => panic!("Not a valid backend"),
}
}

Expand All @@ -199,22 +282,18 @@ impl AsyncQueue {
pub async fn connect(&mut self) -> Result<(), AsyncQueueError> {
//install_default_drivers();

let pool: AnyPool = PoolOptions::new()
.max_connections(self.max_pool_size)
.connect(&self.uri)
.await?;
let kind: AnyKind = self.uri.parse::<AnyConnectOptions>()?.kind();

let anykind = pool.any_kind();

self.backend = get_backend(anykind);
let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?;

self.pool = Some(pool);
self.backend = backend;
self.connected = true;
Ok(())
}

async fn fetch_and_touch_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError> {
Expand Down Expand Up @@ -250,7 +329,7 @@ impl AsyncQueue {
}

async fn insert_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -263,15 +342,15 @@ impl AsyncQueue {
.build();

let task = backend
.execute_query(SqlXQuery::InsertTask, pool, query_params)
.execute_query(SqlXQuery::InsertTask, &pool, query_params)
.await?
.unwrap_task();

Ok(task)
}

async fn insert_task_if_not_exist_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
metadata: &serde_json::Value,
task_type: &str,
Expand All @@ -284,15 +363,15 @@ impl AsyncQueue {
.build();

let task = backend
.execute_query(SqlXQuery::InsertTaskIfNotExists, pool, query_params)
.execute_query(SqlXQuery::InsertTaskIfNotExists, &pool, query_params)
.await?
.unwrap_task();

Ok(task)
}

async fn schedule_task_query(
pool: &Pool<Any>,
pool: &InternalPool,
backend: &BackendSqlX,
task: &dyn AsyncRunnable,
) -> Result<Task, AsyncQueueError> {
Expand Down Expand Up @@ -553,7 +632,7 @@ impl AsyncQueueable for AsyncQueue {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "asynk-postgres"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_postgres() -> Self {
Expand All @@ -575,7 +654,14 @@ impl AsyncQueue {
let create_query: &str = &format!("CREATE DATABASE {} WITH TEMPLATE fang;", db_name);
let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap();
let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_pg_pool()
.acquire()
.await
.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -600,7 +686,10 @@ impl AsyncQueue {

res
}
}

#[cfg(all(test, feature = "asynk-sqlite"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_sqlite() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand Down Expand Up @@ -632,7 +721,10 @@ impl AsyncQueue {
res.connect().await.expect("fail to connect");
res
}
}

#[cfg(all(test, feature = "asynk-mysql"))]
impl AsyncQueue {
/// Provides an AsyncQueue connected to its own DB
pub async fn test_mysql() -> Self {
dotenvy::dotenv().expect(".env file not found");
Expand All @@ -657,7 +749,14 @@ impl AsyncQueue {

let delete_query: &str = &format!("DROP DATABASE IF EXISTS {};", db_name);

let mut conn = res.pool.as_mut().unwrap().acquire().await.unwrap();
let mut conn = res
.pool
.as_mut()
.unwrap()
.unwrap_mysql_pool()
.acquire()
.await
.unwrap();

log::info!("Deleting database {db_name} ...");
conn.execute(delete_query).await.unwrap();
Expand All @@ -684,11 +783,11 @@ impl AsyncQueue {
}
}

#[cfg(test)]
test_asynk_queue! {postgres, crate::AsyncQueue, crate::AsyncQueue::test_postgres()}
#[cfg(all(test, feature = "asynk-postgres"))]
test_asynk_queue! {postgres, crate::AsyncQueue,crate::AsyncQueue::test_postgres()}

#[cfg(test)]
test_asynk_queue! {sqlite, crate::AsyncQueue, crate::AsyncQueue::test_sqlite()}
#[cfg(all(test, feature = "asynk-sqlite"))]
test_asynk_queue! {sqlite, crate::AsyncQueue,crate::AsyncQueue::test_sqlite()}

#[cfg(test)]
#[cfg(all(test, feature = "asynk-mysql"))]
test_asynk_queue! {mysql, crate::AsyncQueue, crate::AsyncQueue::test_mysql()}
2 changes: 1 addition & 1 deletion fang/src/asynk/async_queue/async_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ macro_rules! test_asynk_queue {
}

#[tokio::test]
async fn failed_task_query_test() {
async fn failed_task_test() {
let mut test: $q = $e.await;

let task = test.insert_task(&AsyncTask { number: 1 }).await.unwrap();
Expand Down
Loading
Loading