diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 56dea395..4f603ec3 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -10,7 +10,6 @@ use gcp_bigquery_client::{ }, Client, }; -use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; use pgrx::prelude::{AnyNumeric, Date, Timestamp}; use serde_json::json; @@ -18,70 +17,34 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; -use thiserror::Error; - -macro_rules! field_type_error { - ($field:ident, $err:ident) => {{ - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, - &format!("get field {} failed: {}", &$field.name, $err), - ); - None - }}; -} // convert BigQuery field to Cell -fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> Option { - match field.r#type { - FieldType::Boolean => rs - .get_bool_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(Cell::Bool), - FieldType::Int64 | FieldType::Integer => rs - .get_i64_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(Cell::I64), - FieldType::Float64 | FieldType::Float => rs - .get_f64_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(Cell::F64), - FieldType::Numeric => rs - .get_f64_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(|v| Cell::Numeric(AnyNumeric::try_from(v).unwrap())), - FieldType::String => rs - .get_string_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(Cell::String), - FieldType::Date => rs - .get_string_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(|v| { - let dt = Date::from_str(&v).unwrap(); - Cell::Date(dt) - }), - FieldType::Datetime => rs - .get_string_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(|v| { - let ts = Timestamp::from_str(&v).unwrap(); - Cell::Timestamp(ts) - }), - FieldType::Timestamp => rs - .get_f64_by_name(&field.name) - .unwrap_or_else(|err| field_type_error!(field, err)) - .map(|v| { - let ts = pgrx::to_timestamp(v); - Cell::Timestamp(ts.to_utc()) - }), +fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> BigQueryFdwResult> { + Ok(match field.r#type { + FieldType::Boolean => rs.get_bool_by_name(&field.name)?.map(Cell::Bool), + FieldType::Int64 | FieldType::Integer => rs.get_i64_by_name(&field.name)?.map(Cell::I64), + FieldType::Float64 | FieldType::Float => rs.get_f64_by_name(&field.name)?.map(Cell::F64), + FieldType::Numeric => match rs.get_f64_by_name(&field.name)? { + Some(v) => Some(Cell::Numeric(AnyNumeric::try_from(v)?)), + None => None, + }, + FieldType::String => rs.get_string_by_name(&field.name)?.map(Cell::String), + FieldType::Date => match rs.get_string_by_name(&field.name)? { + Some(v) => Some(Cell::Date(Date::from_str(&v)?)), + None => None, + }, + FieldType::Datetime => match rs.get_string_by_name(&field.name)? { + Some(v) => Some(Cell::Timestamp(Timestamp::from_str(&v)?)), + None => None, + }, + FieldType::Timestamp => rs.get_f64_by_name(&field.name)?.map(|v| { + let ts = pgrx::to_timestamp(v); + Cell::Timestamp(ts.to_utc()) + }), _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("field type {:?} not supported", field.r#type), - ); - None + return Err(BigQueryFdwError::UnsupportedFieldType(field.r#type.clone())); } - } + }) } #[wrappers_fdw( @@ -159,22 +122,26 @@ impl BigQueryFdw { sql } -} -#[derive(Error, Debug)] -enum BigQueryFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), - #[error("{0}")] - OptionsError(#[from] OptionsError), -} - -impl From for ErrorReport { - fn from(value: BigQueryFdwError) -> Self { - match value { - BigQueryFdwError::CreateRuntimeError(e) => e.into(), - BigQueryFdwError::OptionsError(e) => e.into(), + fn extract_row( + tgt_cols: &[Column], + row: &mut Row, + rs: &mut ResultSet, + ) -> BigQueryFdwResult { + if rs.next_row() { + if let Some(schema) = &rs.query_response().schema { + if let Some(fields) = &schema.fields { + for tgt_col in tgt_cols { + if let Some(field) = fields.iter().find(|&f| f.name == tgt_col.name) { + let cell = field_to_cell(rs, field)?; + row.push(&field.name, cell); + } + } + return Ok(true); + } + } } + Ok(false) } } @@ -212,7 +179,8 @@ impl ForeignDataWrapper for BigQueryFdw { let auth_mock_uri = auth_mock.uri(); let dummy_auth_config = dummy_configuration(&auth_mock_uri); ret.auth_mock = Some(auth_mock); - serde_json::to_string_pretty(&dummy_auth_config).unwrap() + serde_json::to_string_pretty(&dummy_auth_config) + .expect("dummy auth config should not fail to serialize") } false => match options.get("sa_key") { Some(sa_key) => sa_key.to_owned(), @@ -354,26 +322,7 @@ impl ForeignDataWrapper for BigQueryFdw { fn iter_scan(&mut self, row: &mut Row) -> Result, BigQueryFdwError> { if let Some(client) = &self.client { if let Some(ref mut rs) = self.scan_result { - let mut extract_row = |rs: &mut ResultSet| { - if rs.next_row() { - if let Some(schema) = &rs.query_response().schema { - if let Some(fields) = &schema.fields { - for tgt_col in &self.tgt_cols { - if let Some(field) = - fields.iter().find(|&f| f.name == tgt_col.name) - { - let cell = field_to_cell(rs, field); - row.push(&field.name, cell); - } - } - return true; - } - } - } - false - }; - - if extract_row(rs) { + if Self::extract_row(&self.tgt_cols, row, rs)? { return Ok(Some(())); } @@ -393,7 +342,7 @@ impl ForeignDataWrapper for BigQueryFdw { Ok(resp) => { // replace result set with data from the new page *rs = ResultSet::new(QueryResponse::from(resp)); - if extract_row(rs) { + if Self::extract_row(&self.tgt_cols, row, rs)? { return Ok(Some(())); } } @@ -449,7 +398,7 @@ impl ForeignDataWrapper for BigQueryFdw { } } - insert_request.add_row(None, row_json).unwrap(); + insert_request.add_row(None, row_json)?; // execute insert job on BigQuery if let Err(err) = self.rt.block_on(client.tabledata().insert_all( @@ -525,6 +474,7 @@ impl ForeignDataWrapper for BigQueryFdw { } } +use crate::fdw::bigquery_fdw::{BigQueryFdwError, BigQueryFdwResult}; use auth_mock::GoogleAuthMock; mod auth_mock { diff --git a/wrappers/src/fdw/bigquery_fdw/mod.rs b/wrappers/src/fdw/bigquery_fdw/mod.rs index e130e831..1e98caeb 100644 --- a/wrappers/src/fdw/bigquery_fdw/mod.rs +++ b/wrappers/src/fdw/bigquery_fdw/mod.rs @@ -1,3 +1,49 @@ #![allow(clippy::module_inception)] + +use gcp_bigquery_client::error::BQError; +use gcp_bigquery_client::model::field_type::FieldType; +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::{DateTimeConversionError, PgSqlErrorCode}; +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; +use thiserror::Error; + mod bigquery_fdw; mod tests; + +#[derive(Error, Debug)] +enum BigQueryFdwError { + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("{0}")] + OptionsError(#[from] OptionsError), + + #[error("big query error: {0}")] + BigQueryError(#[from] BQError), + + #[error("field type {0:?} not supported")] + UnsupportedFieldType(FieldType), + + #[error("{0}")] + NumericConversionError(#[from] pgrx::numeric::Error), + + #[error("{0}")] + DateTimeConversionError(#[from] DateTimeConversionError), +} + +impl From for ErrorReport { + fn from(value: BigQueryFdwError) -> Self { + match value { + BigQueryFdwError::CreateRuntimeError(e) => e.into(), + BigQueryFdwError::OptionsError(e) => e.into(), + BigQueryFdwError::BigQueryError(e) => ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, + format!("{e}"), + "", + ), + _ => ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), ""), + } + } +} + +type BigQueryFdwResult = Result;