diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index a14d5fd1..613dda63 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -1,7 +1,5 @@ use crate::stats; use pgrx::pg_sys; -use pgrx::pg_sys::panic::ErrorReport; -use pgrx::prelude::PgSqlErrorCode; use reqwest::{self, header}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; @@ -9,9 +7,9 @@ use std::collections::HashMap; use url::Url; use supabase_wrappers::prelude::*; -use thiserror::Error; use super::result::AirtableResponse; +use super::{AirtableFdwError, AirtableFdwResult}; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -56,13 +54,12 @@ impl AirtableFdw { } } - #[inline] fn set_limit_offset( &self, url: &str, page_size: Option, offset: Option<&str>, - ) -> Result { + ) -> AirtableFdwResult { let mut params = Vec::new(); if let Some(page_size) = page_size { params.push(("pageSize", format!("{}", page_size))); @@ -71,48 +68,29 @@ impl AirtableFdw { params.push(("offset", offset.to_string())); } - Url::parse_with_params(url, ¶ms).map(|x| x.into()) + Ok(Url::parse_with_params(url, ¶ms).map(|x| x.into())?) } // convert response body text to rows - fn parse_resp(&self, resp_body: &str, columns: &[Column]) -> (Vec, Option) { + fn parse_resp( + &self, + resp_body: &str, + columns: &[Column], + ) -> AirtableFdwResult<(Vec, Option)> { let response: AirtableResponse = serde_json::from_str(resp_body).unwrap(); let mut result = Vec::new(); for record in response.records.iter() { - result.push(record.to_row(columns)); + result.push(record.to_row(columns)?); } - (result, response.offset) - } -} - -macro_rules! report_fetch_error { - ($url:ident, $err:ident) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("fetch {} failed: {}", $url, $err), - ) - }; -} - -#[derive(Error, Debug)] -enum AirtableFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), -} - -impl From for ErrorReport { - fn from(value: AirtableFdwError) -> Self { - match value { - AirtableFdwError::CreateRuntimeError(e) => e.into(), - } + Ok((result, response.offset)) } } // TODO Add support for INSERT, UPDATE, DELETE impl ForeignDataWrapper for AirtableFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> AirtableFdwResult { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -142,7 +120,7 @@ impl ForeignDataWrapper for AirtableFdw { _sorts: &[Sort], // TODO: Propagate sort _limit: &Option, // TODO: maxRecords options: &HashMap, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { require_option("table_id", options) .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) @@ -160,38 +138,23 @@ impl ForeignDataWrapper for AirtableFdw { // Fetch all of the rows upfront. Arguably, this could be done in batches (and invoked each // time iter_scan() runs out of rows) to pipeline the I/O, but we'd have to manage more // state so starting with the simpler solution. - let url = match self.set_limit_offset(&url, None, offset.as_deref()) { - Ok(url) => url, - Err(err) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("internal error: {}", err), - ); - return Ok(()); - } - }; - - match self.rt.block_on(client.get(&url).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let (new_rows, new_offset) = self.parse_resp(&body, columns); - rows.extend(new_rows.into_iter()); - - if let Some(new_offset) = new_offset { - offset = Some(new_offset); - } else { - break; - } - } - Err(err) => report_fetch_error!(url, err), - }, - Err(err) => report_fetch_error!(url, err), + let url = self.set_limit_offset(&url, None, offset.as_deref())?; + + let body = self.rt.block_on(client.get(&url).send()).and_then(|resp| { + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + })?; + + let (new_rows, new_offset) = self.parse_resp(&body, columns)?; + rows.extend(new_rows); + + stats::inc_stats(Self::FDW_NAME, stats::Metric::BytesIn, body.len() as i64); + + if let Some(new_offset) = new_offset { + offset = Some(new_offset); + } else { + break; } } } @@ -203,7 +166,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, AirtableFdwError> { + fn iter_scan(&mut self, row: &mut Row) -> AirtableFdwResult> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { return Ok(result @@ -215,7 +178,7 @@ impl ForeignDataWrapper for AirtableFdw { Ok(None) } - fn end_scan(&mut self) -> Result<(), AirtableFdwError> { + fn end_scan(&mut self) -> AirtableFdwResult<()> { self.scan_result.take(); Ok(()) } @@ -223,7 +186,7 @@ impl ForeignDataWrapper for AirtableFdw { fn validator( options: Vec>, catalog: Option, - ) -> Result<(), AirtableFdwError> { + ) -> AirtableFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "base_id"); diff --git a/wrappers/src/fdw/airtable_fdw/mod.rs b/wrappers/src/fdw/airtable_fdw/mod.rs index cb08ddf3..5a571e28 100644 --- a/wrappers/src/fdw/airtable_fdw/mod.rs +++ b/wrappers/src/fdw/airtable_fdw/mod.rs @@ -1,3 +1,35 @@ #![allow(clippy::module_inception)] mod airtable_fdw; mod result; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::CreateRuntimeError; + +#[derive(Error, Debug)] +enum AirtableFdwError { + #[error("column '{0}' data type is not supported")] + UnsupportedColumnType(String), + + #[error("column '{0}' data type not match")] + ColumnTypeNotMatch(String), + + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("parse url failed: {0}")] + UrlParseError(#[from] url::ParseError), + + #[error("request failed: {0}")] + RequestError(#[from] reqwest_middleware::Error), +} + +impl From for ErrorReport { + fn from(value: AirtableFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type AirtableFdwResult = Result; diff --git a/wrappers/src/fdw/airtable_fdw/result.rs b/wrappers/src/fdw/airtable_fdw/result.rs index 2aeee2ee..c5526f91 100644 --- a/wrappers/src/fdw/airtable_fdw/result.rs +++ b/wrappers/src/fdw/airtable_fdw/result.rs @@ -1,14 +1,15 @@ use pgrx::pg_sys; -use pgrx::prelude::PgSqlErrorCode; use serde::de::{MapAccess, Visitor}; use serde::{Deserialize, Deserializer}; -use serde_json::{value::Number, Value}; +use serde_json::Value; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; use std::str::FromStr; use supabase_wrappers::prelude::*; +use super::{AirtableFdwError, AirtableFdwResult}; + #[derive(Deserialize, Debug)] pub struct AirtableResponse { pub records: Vec, @@ -84,21 +85,9 @@ impl<'de> Deserialize<'de> for AirtableFields { } impl AirtableRecord { - pub fn to_row(&self, columns: &[Column]) -> Row { + pub(super) fn to_row(&self, columns: &[Column]) -> AirtableFdwResult { let mut row = Row::new(); - macro_rules! col_to_cell { - ($col:ident, $src_type:ident, $conv:expr) => {{ - self.fields.0.get(&$col.name).and_then(|val| { - if let Value::$src_type(v) = val { - $conv(v) - } else { - panic!("column '{}' data type not match", $col.name) - } - }) - }}; - } - for col in columns.iter() { if col.name == "id" { row.push("id", Some(Cell::String(self.id.clone()))); @@ -106,52 +95,129 @@ impl AirtableRecord { } let cell = match col.type_oid { - pg_sys::BOOLOID => col_to_cell!(col, Bool, |v: &bool| Some(Cell::Bool(*v))), - pg_sys::CHAROID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I8(n as i8)) - }), - pg_sys::INT2OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I16(n as i16)) - }), - pg_sys::FLOAT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64().map(|n| Cell::F32(n as f32)) - }), - pg_sys::INT4OID => col_to_cell!(col, Number, |v: &Number| { - v.as_i64().map(|n| Cell::I32(n as i32)) - }), - pg_sys::FLOAT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_f64().map(Cell::F64) }) - } - pg_sys::INT8OID => { - col_to_cell!(col, Number, |v: &Number| { v.as_i64().map(Cell::I64) }) - } - pg_sys::NUMERICOID => col_to_cell!(col, Number, |v: &Number| { - v.as_f64() - .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())) - }), - pg_sys::TEXTOID => { - col_to_cell!(col, String, |v: &String| { Some(Cell::String(v.clone())) }) - } - pg_sys::DATEOID => col_to_cell!(col, String, |v: &String| { - pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date) - }), - pg_sys::TIMESTAMPOID => col_to_cell!(col, String, |v: &String| { - pgrx::Timestamp::from_str(v.as_str()) - .ok() - .map(Cell::Timestamp) - }), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("column '{}' data type not supported", col.name), - ); - None - } - }; + pg_sys::BOOLOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Bool(v) = val { + Ok(Some(Cell::Bool(*v))) + } else { + Err(()) + } + }, + ), + pg_sys::CHAROID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I8(n as i8))) + } else { + Err(()) + } + }, + ), + pg_sys::INT2OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I16(n as i16))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(|n| Cell::F32(n as f32))) + } else { + Err(()) + } + }, + ), + pg_sys::INT4OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(|n| Cell::I32(n as i32))) + } else { + Err(()) + } + }, + ), + pg_sys::FLOAT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_f64().map(Cell::F64)) + } else { + Err(()) + } + }, + ), + pg_sys::INT8OID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + Ok(v.as_i64().map(Cell::I64)) + } else { + Err(()) + } + }, + ), + pg_sys::NUMERICOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::Number(v) = val { + let n = v + .as_f64() + .map(|n| Cell::Numeric(pgrx::AnyNumeric::try_from(n).unwrap())); + Ok(n) + } else { + Err(()) + } + }, + ), + pg_sys::TEXTOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(Some(Cell::String(v.clone()))) + } else { + Err(()) + } + }, + ), + pg_sys::DATEOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + Ok(pgrx::Date::from_str(v.as_str()).ok().map(Cell::Date)) + } else { + Err(()) + } + }, + ), + pg_sys::TIMESTAMPOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + let n = pgrx::Timestamp::from_str(v.as_str()) + .ok() + .map(Cell::Timestamp); + Ok(n) + } else { + Err(()) + } + }, + ), + _ => return Err(AirtableFdwError::UnsupportedColumnType(col.name.clone())), + } + .map_err(|_| AirtableFdwError::ColumnTypeNotMatch(col.name.clone()))?; row.push(&col.name, cell); } - row + Ok(row) } }