From 3fb0974347949d1f8a952af6d0f1b852d7c3e701 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Mon, 18 Sep 2023 00:30:51 +1000 Subject: [PATCH] chore(stripe_fdw): refactor error reporting --- wrappers/Cargo.toml | 4 +- wrappers/src/fdw/stripe_fdw/mod.rs | 38 +++ wrappers/src/fdw/stripe_fdw/stripe_fdw.rs | 315 +++++++++------------- 3 files changed, 171 insertions(+), 186 deletions(-) diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index 90de9c8d..3522bb37 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -19,7 +19,7 @@ pg_test = [] helloworld_fdw = [] bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2", "thiserror"] clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"] -stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] +stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror", "url"] firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "thiserror"] s3_fdw = [ "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", @@ -58,7 +58,7 @@ reqwest-retry = { version = "0.2.2", optional = true } yup-oauth2 = { version = "8.0.0", optional = true } regex = { version = "1", optional = true } -# for airtable_fdw +# for airtable_fdw, stripe_fdw url = { version = "2.3", optional = true } # for s3_fdw diff --git a/wrappers/src/fdw/stripe_fdw/mod.rs b/wrappers/src/fdw/stripe_fdw/mod.rs index a93c9685..13ca7cc0 100644 --- a/wrappers/src/fdw/stripe_fdw/mod.rs +++ b/wrappers/src/fdw/stripe_fdw/mod.rs @@ -1,3 +1,41 @@ #![allow(clippy::module_inception)] mod stripe_fdw; mod tests; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; + +#[derive(Error, Debug)] +enum StripeFdwError { + #[error("column '{0}' data type is not supported")] + UnsupportedColumnType(String), + + #[error("Stripe object '{0}' not implemented")] + ObjectNotImplemented(String), + + #[error("{0}")] + OptionsError(#[from] OptionsError), + + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("parse url failed: {0}")] + UrlParseError(#[from] url::ParseError), + + #[error("request failed: {0}")] + RequestError(#[from] reqwest_middleware::Error), + + #[error("parse JSON response failed: {0}")] + JsonParseError(#[from] serde_json::Error), +} + +impl From for ErrorReport { + fn from(value: StripeFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type StripeFdwResult = Result; diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index 0785622f..7eb4d6c9 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -1,6 +1,5 @@ use crate::stats; -use pgrx::pg_sys::panic::ErrorReport; -use pgrx::{datum::datetime_support::to_timestamp, pg_sys, prelude::PgSqlErrorCode, JsonB}; +use pgrx::{datum::datetime_support::to_timestamp, pg_sys, JsonB}; use reqwest::{self, header, StatusCode, Url}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; @@ -8,7 +7,8 @@ use serde_json::{json, Map as JsonMap, Number, Value as JsonValue}; use std::collections::HashMap; use supabase_wrappers::prelude::*; -use thiserror::Error; + +use super::{StripeFdwError, StripeFdwResult}; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = header::HeaderMap::new(); @@ -30,9 +30,9 @@ fn body_to_rows( resp_body: &str, normal_cols: Vec<(&str, &str)>, tgt_cols: &[Column], -) -> (Vec, Option, Option) { +) -> StripeFdwResult<(Vec, Option, Option)> { let mut result = Vec::new(); - let value: JsonValue = serde_json::from_str(resp_body).unwrap(); + let value: JsonValue = serde_json::from_str(resp_body)?; let is_list = value .as_object() .and_then(|v| v.get("object")) @@ -107,7 +107,7 @@ fn body_to_rows( row.push(col_name, cell); } else if &tgt_col.name == "attrs" { // put all properties into 'attrs' JSON column - let attrs = serde_json::from_str(&obj.to_string()).unwrap(); + let attrs = serde_json::from_str(&obj.to_string())?; row.push("attrs", Some(Cell::Json(JsonB(attrs)))); } } @@ -130,10 +130,10 @@ fn body_to_rows( .and_then(|v| v.get("has_more")) .and_then(|v| v.as_bool()); - (result, cursor, has_more) + Ok((result, cursor, has_more)) } -fn row_to_body(row: &Row) -> JsonValue { +fn row_to_body(row: &Row) -> StripeFdwResult { let mut map = JsonMap::new(); for (col_name, cell) in row.iter() { @@ -158,17 +158,16 @@ fn row_to_body(row: &Row) -> JsonValue { } } _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, - &format!("field type {:?} not supported", cell), - ); - return JsonValue::Null; + return Err(StripeFdwError::UnsupportedColumnType(format!( + "{:?}", + col_name + ))); } } } } - JsonValue::Object(map) + Ok(JsonValue::Object(map)) } fn pushdown_quals( @@ -244,17 +243,8 @@ fn inc_stats_request_cnt(stats_metadata: &mut JsonB) { }; } -macro_rules! report_request_error { - ($err:ident) => {{ - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("request failed: {}", $err), - ); - }}; -} - #[wrappers_fdw( - version = "0.1.7", + version = "0.1.8", author = "Supabase", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/stripe_fdw", error_type = "StripeFdwError" @@ -277,8 +267,8 @@ impl StripeFdw { quals: &[Qual], page_size: i64, cursor: &Option, - ) -> Option { - let mut url = self.base_url.join(obj).unwrap(); + ) -> StripeFdwResult> { + let mut url = self.base_url.join(obj)?; // pushdown quals other than id // ref: https://stripe.com/docs/api/[object]/list @@ -307,16 +297,12 @@ impl StripeFdw { "transfers" => vec!["destination"], "checkout/sessions" => vec!["customer", "payment_intent", "subscription"], _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_TABLE_NOT_FOUND, - &format!("'{}' object is not implemented", obj), - ); - return None; + return Err(StripeFdwError::ObjectNotImplemented(obj.to_string())); } }; pushdown_quals(&mut url, obj, quals, fields, page_size, cursor); - Some(url) + Ok(Some(url)) } // convert response body text to rows @@ -325,7 +311,7 @@ impl StripeFdw { obj: &str, resp_body: &str, tgt_cols: &[Column], - ) -> (Vec, Option, Option) { + ) -> StripeFdwResult<(Vec, Option, Option)> { match obj { "accounts" => body_to_rows( resp_body, @@ -616,36 +602,13 @@ impl StripeFdw { ], tgt_cols, ), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_TABLE_NOT_FOUND, - &format!("'{}' object is not implemented", obj), - ); - (Vec::new(), None, None) - } - } - } -} - -#[derive(Error, Debug)] -enum StripeFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), - #[error("{0}")] - OptionsError(#[from] OptionsError), -} - -impl From for ErrorReport { - fn from(value: StripeFdwError) -> Self { - match value { - StripeFdwError::CreateRuntimeError(e) => e.into(), - StripeFdwError::OptionsError(e) => e.into(), + _ => Err(StripeFdwError::ObjectNotImplemented(obj.to_string())), } } } impl ForeignDataWrapper for StripeFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> StripeFdwResult { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -663,7 +626,7 @@ impl ForeignDataWrapper for StripeFdw { Some(api_key) => Some(create_client(api_key)), None => { let key_id = require_option("api_key_id", options)?; - get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + get_vault_secret(key_id).map(|api_key| create_client(&api_key)) } }; @@ -671,7 +634,7 @@ impl ForeignDataWrapper for StripeFdw { Ok(StripeFdw { rt: create_async_runtime()?, - base_url: Url::parse(&base_url).unwrap(), + base_url: Url::parse(&base_url)?, client, scan_result: None, obj: String::default(), @@ -686,7 +649,7 @@ impl ForeignDataWrapper for StripeFdw { _sorts: &[Sort], limit: &Option, options: &HashMap, - ) -> Result<(), StripeFdwError> { + ) -> StripeFdwResult<()> { let obj = require_option("object", options)?; if let Some(client) = &self.client { @@ -707,58 +670,51 @@ impl ForeignDataWrapper for StripeFdw { while page < page_cnt { // build url - let url = self.build_url(&obj, quals, page_size, &cursor); + let url = self.build_url(obj, quals, page_size, &cursor)?; if url.is_none() { return Ok(()); } let url = url.unwrap(); - // make api call inc_stats_request_cnt(&mut stats_metadata); - match self.rt.block_on(client.get(url).send()) { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - - if resp.status() == StatusCode::NOT_FOUND { - // if it is 404 error, we should treat it as an empty - // result rather than a request error - break; - } - match resp.error_for_status() { - Ok(resp) => { - let body = self.rt.block_on(resp.text()).unwrap(); - let (rows, starting_after, has_more) = - self.resp_to_rows(&obj, &body, columns); - if rows.is_empty() { - break; - } - result.extend(rows); - match has_more { - Some(has_more) => { - if !has_more { - break; - } - } - None => break, - } - cursor = starting_after; - } - Err(err) => { - report_request_error!(err); - return Ok(()); - } - } + // make api call + let body = self.rt.block_on(client.get(url).send()).and_then(|resp| { + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::BytesIn, + resp.content_length().unwrap_or(0) as i64, + ); + + if resp.status() == StatusCode::NOT_FOUND { + // if it is 404 error, we should treat it as an empty + // result rather than a request error + return Ok(String::new()); } - Err(err) => { - report_request_error!(err); - return Ok(()); + + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + })?; + if body.is_empty() { + break; + } + + // convert response body to rows + let (rows, starting_after, has_more) = self.resp_to_rows(obj, &body, columns)?; + if rows.is_empty() { + break; + } + result.extend(rows); + match has_more { + Some(has_more) => { + if !has_more { + break; + } } + None => break, } + cursor = starting_after; page += 1; } @@ -774,7 +730,7 @@ impl ForeignDataWrapper for StripeFdw { Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, StripeFdwError> { + fn iter_scan(&mut self, row: &mut Row) -> StripeFdwResult> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { return Ok(result @@ -786,52 +742,49 @@ impl ForeignDataWrapper for StripeFdw { Ok(None) } - fn end_scan(&mut self) -> Result<(), StripeFdwError> { + fn end_scan(&mut self) -> StripeFdwResult<()> { self.scan_result.take(); Ok(()) } - fn begin_modify(&mut self, options: &HashMap) -> Result<(), StripeFdwError> { + fn begin_modify(&mut self, options: &HashMap) -> StripeFdwResult<()> { self.obj = require_option("object", options)?.to_string(); self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } - fn insert(&mut self, src: &Row) -> Result<(), StripeFdwError> { + fn insert(&mut self, src: &Row) -> StripeFdwResult<()> { if let Some(ref mut client) = self.client { let url = self.base_url.join(&self.obj).unwrap(); - let body = row_to_body(src); + let body = row_to_body(src)?; if body.is_null() { return Ok(()); } let mut stats_metadata = get_stats_metadata(); - // call Stripe API inc_stats_request_cnt(&mut stats_metadata); - match self.rt.block_on(client.post(url).form(&body).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let json: JsonValue = serde_json::from_str(&body).unwrap(); - if let Some(id) = json.get("id").and_then(|v| v.as_str()) { - report_info(&format!("inserted {} {}", self.obj, id)); - } - } - Err(err) => { - report_request_error!(err); - return Ok(()); - } - }, - Err(err) => { - report_request_error!(err); - return Ok(()); - } + + // call Stripe API + let body = self + .rt + .block_on(client.post(url).form(&body).send()) + .and_then(|resp| { + resp.error_for_status() + .and_then(|resp| { + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::BytesIn, + resp.content_length().unwrap_or(0) as i64, + ); + self.rt.block_on(resp.text()) + }) + .map_err(reqwest_middleware::Error::from) + })?; + + let json: JsonValue = serde_json::from_str(&body)?; + if let Some(id) = json.get("id").and_then(|v| v.as_str()) { + report_info(&format!("inserted {} {}", self.obj, id)); } set_stats_metadata(stats_metadata); @@ -839,7 +792,7 @@ impl ForeignDataWrapper for StripeFdw { Ok(()) } - fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), StripeFdwError> { + fn update(&mut self, rowid: &Cell, new_row: &Row) -> StripeFdwResult<()> { if let Some(ref mut client) = self.client { let mut stats_metadata = get_stats_metadata(); @@ -851,36 +804,33 @@ impl ForeignDataWrapper for StripeFdw { .unwrap() .join(rowid) .unwrap(); - let body = row_to_body(new_row); + let body = row_to_body(new_row)?; if body.is_null() { return Ok(()); } - // call Stripe API inc_stats_request_cnt(&mut stats_metadata); - match self.rt.block_on(client.post(url).form(&body).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let json: JsonValue = serde_json::from_str(&body).unwrap(); - if let Some(id) = json.get("id").and_then(|v| v.as_str()) { - report_info(&format!("updated {} {}", self.obj, id)); - } - } - Err(err) => { - report_request_error!(err); - return Ok(()); - } - }, - Err(err) => { - report_request_error!(err); - return Ok(()); - } + + // call Stripe API + let body = self + .rt + .block_on(client.post(url).form(&body).send()) + .and_then(|resp| { + resp.error_for_status() + .and_then(|resp| { + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::BytesIn, + resp.content_length().unwrap_or(0) as i64, + ); + self.rt.block_on(resp.text()) + }) + .map_err(reqwest_middleware::Error::from) + })?; + + let json: JsonValue = serde_json::from_str(&body)?; + if let Some(id) = json.get("id").and_then(|v| v.as_str()) { + report_info(&format!("updated {} {}", self.obj, id)); } } _ => unreachable!(), @@ -891,7 +841,7 @@ impl ForeignDataWrapper for StripeFdw { Ok(()) } - fn delete(&mut self, rowid: &Cell) -> Result<(), StripeFdwError> { + fn delete(&mut self, rowid: &Cell) -> StripeFdwResult<()> { if let Some(ref mut client) = self.client { let mut stats_metadata = get_stats_metadata(); @@ -904,31 +854,28 @@ impl ForeignDataWrapper for StripeFdw { .join(rowid) .unwrap(); - // call Stripe API inc_stats_request_cnt(&mut stats_metadata); - match self.rt.block_on(client.delete(url).send()) { - Ok(resp) => match resp.error_for_status() { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - let body = self.rt.block_on(resp.text()).unwrap(); - let json: JsonValue = serde_json::from_str(&body).unwrap(); - if let Some(id) = json.get("id").and_then(|v| v.as_str()) { - report_info(&format!("deleted {} {}", self.obj, id)); - } - } - Err(err) => { - report_request_error!(err); - return Ok(()); - } - }, - Err(err) => { - report_request_error!(err); - return Ok(()); - } + + // call Stripe API + let body = self + .rt + .block_on(client.delete(url).send()) + .and_then(|resp| { + resp.error_for_status() + .and_then(|resp| { + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::BytesIn, + resp.content_length().unwrap_or(0) as i64, + ); + self.rt.block_on(resp.text()) + }) + .map_err(reqwest_middleware::Error::from) + })?; + + let json: JsonValue = serde_json::from_str(&body)?; + if let Some(id) = json.get("id").and_then(|v| v.as_str()) { + report_info(&format!("deleted {} {}", self.obj, id)); } } _ => unreachable!(), @@ -942,7 +889,7 @@ impl ForeignDataWrapper for StripeFdw { fn validator( options: Vec>, catalog: Option, - ) -> Result<(), StripeFdwError> { + ) -> StripeFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "object")?;