Skip to content

Commit

Permalink
fix: remove unwrap in create_async_runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 13, 2023
1 parent a1c0745 commit ba084a9
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 48 deletions.
27 changes: 14 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions supabase-wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pg_test = []

[dependencies]
pgrx = {version = "=0.9.8", default-features = false }
thiserror = "1.0.48"
tokio = { version = "1.24", features = ["rt"] }
uuid = { version = "1.2.2" }
supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" }
Expand Down
21 changes: 19 additions & 2 deletions supabase-wrappers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pgrx::*;
use std::ffi::CStr;
use std::num::NonZeroUsize;
use std::ptr;
use thiserror::Error;
use tokio::runtime::{Builder, Runtime};
use uuid::Uuid;

Expand Down Expand Up @@ -106,6 +107,19 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) {
ereport!(PgLogLevel::ERROR, code, msg, "Wrappers");
}

#[derive(Error, Debug)]
pub enum CreateRuntimeError {
#[error("failed to create async runtime")]
FailedToCreateAsyncRuntime,
}

impl From<CreateRuntimeError> for ErrorReport {
fn from(value: CreateRuntimeError) -> Self {
let error_message = format!("{value}");
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
}
}

/// Create a Tokio async runtime
///
/// Use this runtime to run async code in `block` mode. Run blocked code is
Expand All @@ -132,8 +146,11 @@ pub fn report_error(code: PgSqlErrorCode, msg: &str) {
/// }
/// ```
#[inline]
pub fn create_async_runtime() -> Runtime {
Builder::new_current_thread().enable_all().build().unwrap()
pub fn create_async_runtime() -> Result<Runtime, CreateRuntimeError> {
Builder::new_current_thread()
.enable_all()
.build()
.map_err(|_| CreateRuntimeError::FailedToCreateAsyncRuntime)
}

/// Get decrypted secret from Vault
Expand Down
10 changes: 6 additions & 4 deletions wrappers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15" ]
pg_test = []

helloworld_fdw = []
bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2"]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex"]
stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"]
firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex"]
bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2", "thiserror"]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"]
stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"]
firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "thiserror"]
s3_fdw = [
"reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3",
"tokio", "tokio-util", "csv", "async-compression", "serde_json",
"http", "parquet", "futures", "arrow-array", "chrono"
]
airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url"]
logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json"]
airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"]
logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"]

# Does not include helloworld_fdw because of its general uselessness
all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw"]
Expand Down Expand Up @@ -72,6 +72,8 @@ http = { version = "0.2", optional = true }
parquet = { version = "41.0.0", features = ["async"], optional = true }
arrow-array = { version = "41.0.0", optional = true }

thiserror = { version = "1.0.48", optional = true }

[dev-dependencies]
pgrx-tests = "=0.9.8"

Expand Down
15 changes: 11 additions & 4 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::HashMap;
use url::Url;

use supabase_wrappers::prelude::*;
use thiserror::Error;

use super::result::AirtableResponse;

Expand Down Expand Up @@ -95,11 +96,17 @@ macro_rules! report_fetch_error {
};
}

enum AirtableFdwError {}
#[derive(Error, Debug)]
enum AirtableFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
}

impl From<AirtableFdwError> for ErrorReport {
fn from(_value: AirtableFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
fn from(value: AirtableFdwError) -> Self {
match value {
AirtableFdwError::CreateRuntimeError(e) => e.into(),
}
}
}

Expand All @@ -121,7 +128,7 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);

Ok(Self {
rt: create_async_runtime(),
rt: create_async_runtime()?,
client,
base_url,
scan_result: None,
Expand Down
15 changes: 11 additions & 4 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::str::FromStr;

use supabase_wrappers::prelude::*;
use thiserror::Error;

macro_rules! field_type_error {
($field:ident, $err:ident) => {{
Expand Down Expand Up @@ -160,18 +161,24 @@ impl BigQueryFdw {
}
}

enum BigQueryFdwError {}
#[derive(Error, Debug)]
enum BigQueryFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
}

impl From<BigQueryFdwError> for ErrorReport {
fn from(_value: BigQueryFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
fn from(value: BigQueryFdwError) -> Self {
match value {
BigQueryFdwError::CreateRuntimeError(e) => e.into(),
}
}
}

impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, BigQueryFdwError> {
let mut ret = BigQueryFdw {
rt: create_async_runtime(),
rt: create_async_runtime()?,
client: None,
project_id: "".to_string(),
dataset_id: "".to_string(),
Expand Down
15 changes: 11 additions & 4 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use regex::{Captures, Regex};
use std::collections::HashMap;

use supabase_wrappers::prelude::*;
use thiserror::Error;

fn field_to_cell(row: &types::Row<types::Complex>, i: usize) -> Option<Cell> {
let sql_type = row.sql_type(i).unwrap();
Expand Down Expand Up @@ -198,17 +199,23 @@ impl ClickHouseFdw {
}
}

enum ClickHouseFdwError {}
#[derive(Error, Debug)]
enum ClickHouseFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
}

impl From<ClickHouseFdwError> for ErrorReport {
fn from(_value: ClickHouseFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
fn from(value: ClickHouseFdwError) -> Self {
match value {
ClickHouseFdwError::CreateRuntimeError(e) => e.into(),
}
}
}

impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, ClickHouseFdwError> {
let rt = create_async_runtime();
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => require_option("conn_string_id", options)
Expand Down
15 changes: 11 additions & 4 deletions wrappers/src/fdw/firebase_fdw/firebase_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use yup_oauth2::AccessToken;
use yup_oauth2::ServiceAccountAuthenticator;

use supabase_wrappers::prelude::*;
use thiserror::Error;

macro_rules! report_request_error {
($url:ident, $err:ident) => {
Expand Down Expand Up @@ -247,18 +248,24 @@ impl FirebaseFdw {
}
}

enum FirebaseFdwError {}
#[derive(Error, Debug)]
enum FirebaseFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
}

impl From<FirebaseFdwError> for ErrorReport {
fn from(_value: FirebaseFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
fn from(value: FirebaseFdwError) -> Self {
match value {
FirebaseFdwError::CreateRuntimeError(e) => e.into(),
}
}
}

impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, FirebaseFdwError> {
let mut ret = Self {
rt: create_async_runtime(),
rt: create_async_runtime()?,
project_id: "".to_string(),
client: None,
scan_result: None,
Expand Down
Loading

0 comments on commit ba084a9

Please sign in to comment.