Skip to content

Commit

Permalink
Merge pull request #164 from supabase/chore/bigquery-fdw-error
Browse files Browse the repository at this point in the history
chore(bigquery_fdw): refactor error reporting
  • Loading branch information
imor authored Oct 4, 2023
2 parents 4b64a6e + b989b16 commit b4cfd2d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 98 deletions.
146 changes: 48 additions & 98 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,78 +10,41 @@ use gcp_bigquery_client::{
},
Client,
};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use pgrx::prelude::{AnyNumeric, Date, Timestamp};
use serde_json::json;
use std::collections::HashMap;
use std::str::FromStr;

use supabase_wrappers::prelude::*;
use thiserror::Error;

macro_rules! field_type_error {
($field:ident, $err:ident) => {{
report_error(
PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE,
&format!("get field {} failed: {}", &$field.name, $err),
);
None
}};
}

// convert BigQuery field to Cell
fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> Option<Cell> {
match field.r#type {
FieldType::Boolean => rs
.get_bool_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(Cell::Bool),
FieldType::Int64 | FieldType::Integer => rs
.get_i64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(Cell::I64),
FieldType::Float64 | FieldType::Float => rs
.get_f64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(Cell::F64),
FieldType::Numeric => rs
.get_f64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(|v| Cell::Numeric(AnyNumeric::try_from(v).unwrap())),
FieldType::String => rs
.get_string_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(Cell::String),
FieldType::Date => rs
.get_string_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(|v| {
let dt = Date::from_str(&v).unwrap();
Cell::Date(dt)
}),
FieldType::Datetime => rs
.get_string_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(|v| {
let ts = Timestamp::from_str(&v).unwrap();
Cell::Timestamp(ts)
}),
FieldType::Timestamp => rs
.get_f64_by_name(&field.name)
.unwrap_or_else(|err| field_type_error!(field, err))
.map(|v| {
let ts = pgrx::to_timestamp(v);
Cell::Timestamp(ts.to_utc())
}),
fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> BigQueryFdwResult<Option<Cell>> {
Ok(match field.r#type {
FieldType::Boolean => rs.get_bool_by_name(&field.name)?.map(Cell::Bool),
FieldType::Int64 | FieldType::Integer => rs.get_i64_by_name(&field.name)?.map(Cell::I64),
FieldType::Float64 | FieldType::Float => rs.get_f64_by_name(&field.name)?.map(Cell::F64),
FieldType::Numeric => match rs.get_f64_by_name(&field.name)? {
Some(v) => Some(Cell::Numeric(AnyNumeric::try_from(v)?)),
None => None,
},
FieldType::String => rs.get_string_by_name(&field.name)?.map(Cell::String),
FieldType::Date => match rs.get_string_by_name(&field.name)? {
Some(v) => Some(Cell::Date(Date::from_str(&v)?)),
None => None,
},
FieldType::Datetime => match rs.get_string_by_name(&field.name)? {
Some(v) => Some(Cell::Timestamp(Timestamp::from_str(&v)?)),
None => None,
},
FieldType::Timestamp => rs.get_f64_by_name(&field.name)?.map(|v| {
let ts = pgrx::to_timestamp(v);
Cell::Timestamp(ts.to_utc())
}),
_ => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("field type {:?} not supported", field.r#type),
);
None
return Err(BigQueryFdwError::UnsupportedFieldType(field.r#type.clone()));
}
}
})
}

#[wrappers_fdw(
Expand Down Expand Up @@ -159,22 +122,26 @@ impl BigQueryFdw {

sql
}
}

#[derive(Error, Debug)]
enum BigQueryFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),
#[error("{0}")]
OptionsError(#[from] OptionsError),
}

impl From<BigQueryFdwError> for ErrorReport {
fn from(value: BigQueryFdwError) -> Self {
match value {
BigQueryFdwError::CreateRuntimeError(e) => e.into(),
BigQueryFdwError::OptionsError(e) => e.into(),
fn extract_row(
tgt_cols: &[Column],
row: &mut Row,
rs: &mut ResultSet,
) -> BigQueryFdwResult<bool> {
if rs.next_row() {
if let Some(schema) = &rs.query_response().schema {
if let Some(fields) = &schema.fields {
for tgt_col in tgt_cols {
if let Some(field) = fields.iter().find(|&f| f.name == tgt_col.name) {
let cell = field_to_cell(rs, field)?;
row.push(&field.name, cell);
}
}
return Ok(true);
}
}
}
Ok(false)
}
}

Expand Down Expand Up @@ -212,7 +179,8 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
let auth_mock_uri = auth_mock.uri();
let dummy_auth_config = dummy_configuration(&auth_mock_uri);
ret.auth_mock = Some(auth_mock);
serde_json::to_string_pretty(&dummy_auth_config).unwrap()
serde_json::to_string_pretty(&dummy_auth_config)
.expect("dummy auth config should not fail to serialize")
}
false => match options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
Expand Down Expand Up @@ -354,26 +322,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, BigQueryFdwError> {
if let Some(client) = &self.client {
if let Some(ref mut rs) = self.scan_result {
let mut extract_row = |rs: &mut ResultSet| {
if rs.next_row() {
if let Some(schema) = &rs.query_response().schema {
if let Some(fields) = &schema.fields {
for tgt_col in &self.tgt_cols {
if let Some(field) =
fields.iter().find(|&f| f.name == tgt_col.name)
{
let cell = field_to_cell(rs, field);
row.push(&field.name, cell);
}
}
return true;
}
}
}
false
};

if extract_row(rs) {
if Self::extract_row(&self.tgt_cols, row, rs)? {
return Ok(Some(()));
}

Expand All @@ -393,7 +342,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
Ok(resp) => {
// replace result set with data from the new page
*rs = ResultSet::new(QueryResponse::from(resp));
if extract_row(rs) {
if Self::extract_row(&self.tgt_cols, row, rs)? {
return Ok(Some(()));
}
}
Expand Down Expand Up @@ -449,7 +398,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
}
}

insert_request.add_row(None, row_json).unwrap();
insert_request.add_row(None, row_json)?;

// execute insert job on BigQuery
if let Err(err) = self.rt.block_on(client.tabledata().insert_all(
Expand Down Expand Up @@ -525,6 +474,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
}
}

use crate::fdw::bigquery_fdw::{BigQueryFdwError, BigQueryFdwResult};
use auth_mock::GoogleAuthMock;

mod auth_mock {
Expand Down
46 changes: 46 additions & 0 deletions wrappers/src/fdw/bigquery_fdw/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
#![allow(clippy::module_inception)]

use gcp_bigquery_client::error::BQError;
use gcp_bigquery_client::model::field_type::FieldType;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{DateTimeConversionError, PgSqlErrorCode};
use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError};
use thiserror::Error;

mod bigquery_fdw;
mod tests;

#[derive(Error, Debug)]
enum BigQueryFdwError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),

#[error("{0}")]
OptionsError(#[from] OptionsError),

#[error("big query error: {0}")]
BigQueryError(#[from] BQError),

#[error("field type {0:?} not supported")]
UnsupportedFieldType(FieldType),

#[error("{0}")]
NumericConversionError(#[from] pgrx::numeric::Error),

#[error("{0}")]
DateTimeConversionError(#[from] DateTimeConversionError),
}

impl From<BigQueryFdwError> for ErrorReport {
fn from(value: BigQueryFdwError) -> Self {
match value {
BigQueryFdwError::CreateRuntimeError(e) => e.into(),
BigQueryFdwError::OptionsError(e) => e.into(),
BigQueryFdwError::BigQueryError(e) => ErrorReport::new(
PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE,
format!("{e}"),
"",
),
_ => ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), ""),
}
}
}

type BigQueryFdwResult<T> = Result<T, BigQueryFdwError>;

0 comments on commit b4cfd2d

Please sign in to comment.