Skip to content

Commit

Permalink
refactor: separate options module
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 13, 2023
1 parent 5306290 commit a1c0745
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 115 deletions.
1 change: 1 addition & 0 deletions supabase-wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pgrx = {version = "=0.9.8", default-features = false }
tokio = { version = "1.24", features = ["rt"] }
uuid = { version = "1.2.2" }
supabase-wrappers-macros = { version = "0.1", path = "../supabase-wrappers-macros" }
thiserror = "1.0.48"

[dev-dependencies]
pgrx-tests = "=0.9.8"
Expand Down
8 changes: 3 additions & 5 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::prelude::*;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::*;

use super::utils;

// create a fdw instance
pub(super) unsafe fn create_fdw_instance<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
ftable_id: pg_sys::Oid,
) -> W {
let ftable = pg_sys::GetForeignTable(ftable_id);
let fserver = pg_sys::GetForeignServer((*ftable).serverid);
let fserver_opts = utils::options_to_hashmap((*fserver).options);
let fserver_opts = options_to_hashmap((*fserver).options);
let wrapper = W::new(&fserver_opts);
wrapper.map_err(|e| e.into()).report()
wrapper.report_unwrap()
}
2 changes: 2 additions & 0 deletions supabase-wrappers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,13 @@
//! - [Logflare](https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw): A FDW for [Logflare](https://logflare.app/) which supports data read only.
pub mod interface;
pub mod options;
pub mod utils;

/// The prelude includes all necessary imports to make Wrappers work
pub mod prelude {
pub use crate::interface::*;
pub use crate::options::*;
pub use crate::utils::*;
pub use crate::wrappers_fdw;
pub use ::tokio::runtime::Runtime;
Expand Down
25 changes: 9 additions & 16 deletions supabase-wrappers/src/modify.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{
debug2, memcxt::PgMemoryContexts, pg_sys::Oid, prelude::*, rel::PgRelation,
tupdesc::PgTupleDesc, FromDatum, PgSqlErrorCode,
Expand Down Expand Up @@ -81,12 +81,8 @@ pub(super) extern "C" fn add_foreign_update_targets(
unsafe {
// get rowid column name from table options
let ftable = pg_sys::GetForeignTable((*target_relation).rd_id);
let opts = utils::options_to_hashmap((*ftable).options);
let rowid_name = if let Some(name) = require_option("rowid_column", &opts) {
name
} else {
return;
};
let opts = options_to_hashmap((*ftable).options);
let rowid_name = require_option("rowid_column", &opts).report_unwrap();

// find rowid attribute
let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att);
Expand Down Expand Up @@ -139,7 +135,7 @@ pub(super) extern "C" fn plan_foreign_modify<E: Into<ErrorReport>, W: ForeignDat

// get rowid column name from table options
let ftable = pg_sys::GetForeignTable(rel.oid());
let opts = utils::options_to_hashmap((*ftable).options);
let opts = options_to_hashmap((*ftable).options);
let rowid_name = opts.get("rowid_column");
if rowid_name.is_none() {
report_error(
Expand Down Expand Up @@ -208,7 +204,7 @@ pub(super) extern "C" fn begin_foreign_modify<E: Into<ErrorReport>, W: ForeignDa
state.rowid_attno =
pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c);

state.begin_modify().map_err(|e| e.into()).report();
state.begin_modify().report_unwrap();

(*rinfo).ri_FdwState = state.into_pg() as _;
}
Expand All @@ -228,7 +224,7 @@ pub(super) extern "C" fn exec_foreign_insert<E: Into<ErrorReport>, W: ForeignDat
);

let row = utils::tuple_table_slot_to_row(slot);
state.insert(&row).map_err(|e| e.into()).report();
state.insert(&row).report_unwrap();
}

slot
Expand Down Expand Up @@ -258,7 +254,7 @@ pub(super) extern "C" fn exec_foreign_delete<E: Into<ErrorReport>, W: ForeignDat

let cell = get_rowid_cell(&state, plan_slot);
if let Some(rowid) = cell {
state.delete(&rowid).map_err(|e| e.into()).report();
state.delete(&rowid).report_unwrap();
}
}

Expand Down Expand Up @@ -292,10 +288,7 @@ pub(super) extern "C" fn exec_foreign_update<E: Into<ErrorReport>, W: ForeignDat
}) && state.rowid_name != col.as_str()
});

state
.update(&rowid, &new_row)
.map_err(|e| e.into())
.report();
state.update(&rowid, &new_row).report_unwrap();
}
}

Expand All @@ -312,7 +305,7 @@ pub(super) extern "C" fn end_foreign_modify<E: Into<ErrorReport>, W: ForeignData
let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState<E, W>;
if !fdw_state.is_null() {
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(fdw_state);
state.end_modify().map_err(|e| e.into()).report();
state.end_modify().report_unwrap();
}
}
}
114 changes: 114 additions & 0 deletions supabase-wrappers/src/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::utils::report_error;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{pg_sys, PgList, PgSqlErrorCode};
use std::collections::HashMap;
use std::ffi::CStr;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum OptionsError {
#[error("required option `{0}` is not specified")]
OptionNameNotFound(String),
#[error("an option name is not a valid UTF-8 string")]
OptionNameIsInvalidUtf8,
#[error("an option value is not a valid UTF-8 string")]
OptionValueIsInvalidUtf8,
}

impl From<OptionsError> for ErrorReport {
fn from(value: OptionsError) -> Self {
let error_message = format!("{value}");
match value {
OptionsError::OptionNameNotFound(_) => ErrorReport::new(
PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND,
error_message,
"",
),
OptionsError::OptionNameIsInvalidUtf8 | OptionsError::OptionValueIsInvalidUtf8 => {
ErrorReport::new(
PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT,
error_message,
"",
)
}
}
}
}

/// Get required option value from the `options` map
///
/// Get the required option's value from `options` map, return None and report
/// error and stop current transaction if it does not exist.
///
/// For example,
///
/// ```rust,no_run
/// # use supabase_wrappers::prelude::require_option;
/// # use std::collections::HashMap;
/// # let options = &HashMap::new();
/// require_option("my_option", options);
/// ```
pub fn require_option<'map>(
opt_name: &str,
options: &'map HashMap<String, String>,
) -> Result<&'map str, OptionsError> {
options
.get(opt_name)
.map(|t| t.as_ref())
.ok_or(OptionsError::OptionNameNotFound(opt_name.to_string()))
}

/// Get required option value from the `options` map or a provided default
///
/// Get the required option's value from `options` map, return default if it does not exist.
///
/// For example,
///
/// ```rust,no_run
/// # use supabase_wrappers::prelude::require_option_or;
/// # use std::collections::HashMap;
/// # let options = &HashMap::new();
/// require_option_or("my_option", options, "default value".to_string());
/// ```
pub fn require_option_or(
opt_name: &str,
options: &HashMap<String, String>,
default: String,
) -> String {
options
.get(opt_name)
.map(|t| t.to_owned())
.unwrap_or(default)
}

/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator)
pub fn check_options_contain(opt_list: &[Option<String>], tgt: &str) {
let search_key = tgt.to_owned() + "=";
if !opt_list.iter().any(|opt| {
if let Some(s) = opt {
s.starts_with(&search_key)
} else {
false
}
}) {
report_error(
PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND,
&format!("required option \"{}\" is not specified", tgt),
);
}
}

// convert options definition to hashmap
pub(super) unsafe fn options_to_hashmap(options: *mut pg_sys::List) -> HashMap<String, String> {
let mut ret = HashMap::new();
let options: PgList<pg_sys::DefElem> = PgList::from_pg(options);
for option in options.iter_ptr() {
let name = CStr::from_ptr((*option).defname);
let value = CStr::from_ptr(pg_sys::defGetString(option));
ret.insert(
name.to_str().unwrap().to_owned(),
value.to_str().unwrap().to_owned(),
);
}
ret
}
17 changes: 9 additions & 8 deletions supabase-wrappers/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ use pgrx::{
use std::collections::HashMap;
use std::marker::PhantomData;

use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::pg_sys::panic::ErrorReport;
use std::os::raw::c_int;
use std::ptr;

use crate::instance;
use crate::interface::{Cell, Column, Limit, Qual, Row, Sort, Value};
use crate::limit::*;
use crate::memctx;
use crate::options::options_to_hashmap;
use crate::polyfill;
use crate::prelude::ForeignDataWrapper;
use crate::qual::*;
use crate::sort::*;
use crate::utils::{self, report_error, SerdeList};
use crate::utils::{self, report_error, ReportableError, SerdeList};

// Fdw private state for scan
struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
Expand Down Expand Up @@ -137,10 +138,10 @@ pub(super) extern "C" fn get_foreign_rel_size<E: Into<ErrorReport>, W: ForeignDa

// get foreign table options
let ftable = pg_sys::GetForeignTable(foreigntableid);
state.opts = utils::options_to_hashmap((*ftable).options);
state.opts = options_to_hashmap((*ftable).options);

// get estimate row count and mean row width
let (rows, width) = state.get_rel_size().map_err(|e| e.into()).report();
let (rows, width) = state.get_rel_size().report_unwrap();
(*baserel).rows = rows as f64;
(*(*baserel).reltarget).width = width;

Expand Down Expand Up @@ -305,7 +306,7 @@ pub(super) extern "C" fn begin_foreign_scan<E: Into<ErrorReport>, W: ForeignData

// begin scan if it is not EXPLAIN statement
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 {
state.begin_scan().map_err(|e| e.into()).report();
state.begin_scan().report_unwrap();

let rel = scan_state.ss_currentRelation;
let tup_desc = (*rel).rd_att;
Expand Down Expand Up @@ -336,7 +337,7 @@ pub(super) extern "C" fn iterate_foreign_scan<E: Into<ErrorReport>, W: ForeignDa
polyfill::exec_clear_tuple(slot);

state.row.clear();
if state.iter_scan().map_err(|e| e.into()).report().is_some() {
if state.iter_scan().report_unwrap().is_some() {
if state.row.cols.len() != state.tgts.len() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
Expand Down Expand Up @@ -375,7 +376,7 @@ pub(super) extern "C" fn re_scan_foreign_scan<E: Into<ErrorReport>, W: ForeignDa
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
if !fdw_state.is_null() {
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
state.re_scan().map_err(|e| e.into()).report();
state.re_scan().report_unwrap();
}
}
}
Expand All @@ -392,6 +393,6 @@ pub(super) extern "C" fn end_foreign_scan<E: Into<ErrorReport>, W: ForeignDataWr
}

let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
state.end_scan().map_err(|e| e.into()).report();
state.end_scan().report_unwrap();
}
}
Loading

0 comments on commit a1c0745

Please sign in to comment.