Skip to content

Commit

Permalink
fix: require_option returns result
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 13, 2023
1 parent e1aa8ff commit 6913cf6
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 119 deletions.
1 change: 0 additions & 1 deletion supabase-wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ thiserror = "1.0.48"
tokio = { version = "1.24", features = ["rt"] }
uuid = { version = "1.2.2" }
supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" }
thiserror = "1.0.48"

[dev-dependencies]
pgrx-tests = "=0.9.8"
Expand Down
2 changes: 1 addition & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json",
s3_fdw = [
"reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3",
"tokio", "tokio-util", "csv", "async-compression", "serde_json",
"http", "parquet", "futures", "arrow-array", "chrono"
"http", "parquet", "futures", "arrow-array", "chrono", "thiserror"
]
airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"]
logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"]
Expand Down
22 changes: 11 additions & 11 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ macro_rules! report_fetch_error {
enum AirtableFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<AirtableFdwError> for ErrorReport {
fn from(value: AirtableFdwError) -> Self {
match value {
AirtableFdwError::CreateRuntimeError(e) => e.into(),
AirtableFdwError::Options(e) => e.into(),
}
}
}
Expand All @@ -120,9 +123,10 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {

let client = match options.get("api_key") {
Some(api_key) => Some(create_client(api_key)),
None => require_option("api_key_id", options)
.and_then(|key_id| get_vault_secret(&key_id))
.map(|api_key| create_client(&api_key)),
None => {
let key_id = require_option("api_key_id", options)?;
get_vault_secret(&key_id).map(|api_key| create_client(&api_key))
}
};

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
Expand All @@ -143,14 +147,10 @@ impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
_limit: &Option<Limit>, // TODO: maxRecords
options: &HashMap<String, String>,
) -> Result<(), AirtableFdwError> {
let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| {
require_option("table_id", options)
.map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id")))
}) {
url
} else {
return Ok(());
};
let base_id = require_option("base_id", options)?;
let table_id = require_option("table_id", options)?;
let view_id = options.get("view_id");
let url = self.build_url(&base_id, &table_id, view_id);

let mut rows = Vec::new();
if let Some(client) = &self.client {
Expand Down
36 changes: 9 additions & 27 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,15 @@ impl BigQueryFdw {
enum BigQueryFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<BigQueryFdwError> for ErrorReport {
fn from(value: BigQueryFdwError) -> Self {
match value {
BigQueryFdwError::CreateRuntimeError(e) => e.into(),
BigQueryFdwError::Options(e) => e.into(),
}
}
}
Expand All @@ -180,24 +183,15 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
let mut ret = BigQueryFdw {
rt: create_async_runtime()?,
client: None,
project_id: "".to_string(),
dataset_id: "".to_string(),
project_id: require_option("project_id", options)?.to_string(),
dataset_id: require_option("dataset_id", options)?.to_string(),
table: "".to_string(),
rowid_col: "".to_string(),
tgt_cols: Vec::new(),
scan_result: None,
auth_mock: None,
};

let project_id = require_option("project_id", options);
let dataset_id = require_option("dataset_id", options);

if project_id.is_none() || dataset_id.is_none() {
return Ok(ret);
}
ret.project_id = project_id.unwrap();
ret.dataset_id = dataset_id.unwrap();

// Is authentication mocked
let mock_auth: bool = options
.get("mock_auth")
Expand All @@ -223,10 +217,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
false => match options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = match require_option("sa_key_id", options) {
Some(sa_key_id) => sa_key_id,
None => return Ok(ret),
};
let sa_key_id = require_option("sa_key_id", options)?;
match get_vault_secret(&sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down Expand Up @@ -285,11 +276,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
limit: &Option<Limit>,
options: &HashMap<String, String>,
) -> Result<(), BigQueryFdwError> {
let table = require_option("table", options);
if table.is_none() {
return Ok(());
}
self.table = table.unwrap();
self.table = require_option("table", options)?.to_string();
self.tgt_cols = columns.to_vec();

let location = options
Expand Down Expand Up @@ -432,13 +419,8 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
}

fn begin_modify(&mut self, options: &HashMap<String, String>) -> Result<(), BigQueryFdwError> {
let table = require_option("table", options);
let rowid_col = require_option("rowid_column", options);
if table.is_none() || rowid_col.is_none() {
return Ok(());
}
self.table = table.unwrap();
self.rowid_col = rowid_col.unwrap();
self.table = require_option("table", options)?.to_string();
self.rowid_col = require_option("rowid_column", options)?.to_string();

Ok(())
}
Expand Down
25 changes: 10 additions & 15 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,15 @@ impl ClickHouseFdw {
enum ClickHouseFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<ClickHouseFdwError> for ErrorReport {
fn from(value: ClickHouseFdwError) -> Self {
match value {
ClickHouseFdwError::CreateRuntimeError(e) => e.into(),
ClickHouseFdwError::Options(e) => e.into(),
}
}
}
Expand All @@ -218,9 +221,10 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => require_option("conn_string_id", options)
.and_then(|conn_str_id| get_vault_secret(&conn_str_id))
.unwrap_or_default(),
None => {
let conn_str_id = require_option("conn_string_id", options)?;
get_vault_secret(&conn_str_id).unwrap_or_default()
}
};

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
Expand Down Expand Up @@ -248,11 +252,7 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
) -> Result<(), ClickHouseFdwError> {
self.create_client();

let table = require_option("table", options);
if table.is_none() {
return Ok(());
}
self.table = table.unwrap();
self.table = require_option("table", options)?.to_string();
self.tgt_cols = columns.to_vec();
self.row_idx = 0;

Expand Down Expand Up @@ -329,13 +329,8 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
) -> Result<(), ClickHouseFdwError> {
self.create_client();

let table = require_option("table", options);
let rowid_col = require_option("rowid_column", options);
if table.is_none() || rowid_col.is_none() {
return Ok(());
}
self.table = table.unwrap();
self.rowid_col = rowid_col.unwrap();
self.table = require_option("table", options)?.to_string();
self.rowid_col = require_option("rowid_column", options)?.to_string();
Ok(())
}

Expand Down
20 changes: 6 additions & 14 deletions wrappers/src/fdw/firebase_fdw/firebase_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,15 @@ impl FirebaseFdw {
enum FirebaseFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<FirebaseFdwError> for ErrorReport {
fn from(value: FirebaseFdwError) -> Self {
match value {
FirebaseFdwError::CreateRuntimeError(e) => e.into(),
FirebaseFdwError::Options(e) => e.into(),
}
}
}
Expand All @@ -266,16 +269,11 @@ impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, FirebaseFdwError> {
let mut ret = Self {
rt: create_async_runtime()?,
project_id: "".to_string(),
project_id: require_option("project_id", options)?.to_string(),
client: None,
scan_result: None,
};

ret.project_id = match require_option("project_id", options) {
Some(project_id) => project_id,
None => return Ok(ret),
};

// get oauth2 access token if it is directly defined in options
let token = if let Some(access_token) = options.get("access_token") {
access_token.to_owned()
Expand All @@ -284,10 +282,7 @@ impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
let sa_key = match options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = match require_option("sa_key_id", options) {
Some(sa_key_id) => sa_key_id,
None => return Ok(ret),
};
let sa_key_id = require_option("sa_key_id", options)?;
match get_vault_secret(&sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down Expand Up @@ -330,10 +325,7 @@ impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
_limit: &Option<Limit>,
options: &HashMap<String, String>,
) -> Result<(), FirebaseFdwError> {
let obj = match require_option("object", options) {
Some(obj) => obj,
None => return Ok(()),
};
let obj = require_option("object", options)?;
let row_cnt_limit = options
.get("limit")
.map(|n| n.parse::<usize>().unwrap())
Expand Down
12 changes: 7 additions & 5 deletions wrappers/src/fdw/logflare_fdw/logflare_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,17 @@ impl LogflareFdw {

#[derive(Error, Debug)]
enum LogflareFdwError {
#[error("{0}")]
Options(#[from] OptionsError),
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
Options(#[from] OptionsError),
}

impl From<LogflareFdwError> for ErrorReport {
fn from(value: LogflareFdwError) -> Self {
match value {
LogflareFdwError::CreateRuntimeError(e) => e.into(),
LogflareFdwError::Options(e) => e.into(),
}
}
}
Expand All @@ -247,9 +248,10 @@ impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
.unwrap_or_else(|| LogflareFdw::BASE_URL.to_string());
let client = match options.get("api_key") {
Some(api_key) => Some(create_client(api_key)),
None => require_option("api_key_id", options)?
.and_then(|key_id| get_vault_secret(&key_id))
.map(|api_key| create_client(&api_key)),
None => {
let key_id = require_option("api_key_id", options)?;
get_vault_secret(&key_id).map(|api_key| create_client(&api_key))
}
};

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
Expand Down
Loading

0 comments on commit 6913cf6

Please sign in to comment.