diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index 4445c68f..11704c19 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -27,7 +27,7 @@ s3_fdw = [ "http", "parquet", "futures", "arrow-array", "chrono", "thiserror" ] airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"] -logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] +logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror", "url"] # Does not include helloworld_fdw because of its general uselessness all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw"] diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 5e6647e3..d0b7f8ab 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -1,8 +1,7 @@ use crate::stats; -use pgrx::pg_sys::panic::ErrorReport; use pgrx::{ pg_sys, - prelude::{AnyNumeric, Date, PgSqlErrorCode, Timestamp}, + prelude::{AnyNumeric, Date, Timestamp}, }; use reqwest::{ self, @@ -16,7 +15,8 @@ use std::collections::HashMap; use std::str::FromStr; use supabase_wrappers::prelude::*; -use thiserror::Error; + +use super::{LogflareFdwError, LogflareFdwResult}; fn create_client(api_key: &str) -> ClientWithMiddleware { let mut headers = HeaderMap::new(); @@ -34,16 +34,6 @@ fn create_client(api_key: &str) -> ClientWithMiddleware { .build() } -macro_rules! report_request_error { - ($err:ident) => {{ - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("request failed: {}", $err), - ); - return Ok(()); - }}; -} - fn extract_params(quals: &[Qual]) -> Option> { let params = quals .iter() @@ -58,73 +48,45 @@ fn extract_params(quals: &[Qual]) -> Option> { Some(params) } -macro_rules! type_mismatch { - ($col:ident) => { - panic!("column '{}' data type not match", $col.name) - }; -} - -fn json_value_to_cell(tgt_col: &Column, v: &JsonValue) -> Cell { +fn json_value_to_cell(tgt_col: &Column, v: &JsonValue) -> LogflareFdwResult { match tgt_col.type_oid { - pg_sys::BOOLOID => Cell::Bool(v.as_bool().unwrap_or_else(|| type_mismatch!(tgt_col))), - pg_sys::CHAROID => Cell::I8( - v.as_i64() - .map(|s| i8::try_from(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::INT2OID => Cell::I16( - v.as_i64() - .map(|s| i16::try_from(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::FLOAT4OID => Cell::F32( - v.as_f64() - .map(|s| s as f32) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::INT4OID => Cell::I32( - v.as_i64() - .map(|s| i32::try_from(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::FLOAT8OID => Cell::F64(v.as_f64().unwrap_or_else(|| type_mismatch!(tgt_col))), - pg_sys::INT8OID => Cell::I64(v.as_i64().unwrap_or_else(|| type_mismatch!(tgt_col))), - pg_sys::NUMERICOID => Cell::Numeric( - v.as_f64() - .map(|s| AnyNumeric::try_from(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::TEXTOID => Cell::String( - v.as_str() - .map(|s| s.to_owned()) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::DATEOID => Cell::Date( - v.as_str() - .map(|s| Date::from_str(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), - pg_sys::TIMESTAMPOID => Cell::Timestamp( - v.as_str() - .map(|s| Timestamp::from_str(s).unwrap_or_else(|_| type_mismatch!(tgt_col))) - .unwrap_or_else(|| type_mismatch!(tgt_col)), - ), + pg_sys::BOOLOID => v.as_bool().map(Cell::Bool), + pg_sys::CHAROID => v.as_i64().and_then(|s| i8::try_from(s).ok()).map(Cell::I8), + pg_sys::INT2OID => v + .as_i64() + .and_then(|s| i16::try_from(s).ok()) + .map(Cell::I16), + pg_sys::FLOAT4OID => v.as_f64().map(|s| s as f32).map(Cell::F32), + pg_sys::INT4OID => v + .as_i64() + .and_then(|s| i32::try_from(s).ok()) + .map(Cell::I32), + pg_sys::FLOAT8OID => v.as_f64().map(Cell::F64), + pg_sys::INT8OID => v.as_i64().map(Cell::I64), + pg_sys::NUMERICOID => v + .as_f64() + .and_then(|s| AnyNumeric::try_from(s).ok()) + .map(Cell::Numeric), + pg_sys::TEXTOID => v.as_str().map(|s| s.to_owned()).map(Cell::String), + pg_sys::DATEOID => v + .as_str() + .and_then(|s| Date::from_str(s).ok()) + .map(Cell::Date), + pg_sys::TIMESTAMPOID => v + .as_str() + .and_then(|s| Timestamp::from_str(s).ok()) + .map(Cell::Timestamp), _ => { - // report error and return a dummy cell - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, - &format!( - "column '{}' data type oid '{}' is not supported", - tgt_col.name, tgt_col.type_oid - ), - ); - Cell::Bool(false) + return Err(LogflareFdwError::UnsupportedColumnType( + tgt_col.name.clone(), + )) } } + .ok_or(LogflareFdwError::ColumnTypeNotMatch(tgt_col.name.clone())) } #[wrappers_fdw( - version = "0.1.0", + version = "0.1.1", author = "Supabase", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw", error_type = "LogflareFdwError" @@ -133,7 +95,7 @@ pub(crate) struct LogflareFdw { rt: Runtime, base_url: Url, client: Option, - scan_result: Option>, + scan_result: Vec, params: Vec, } @@ -141,19 +103,16 @@ impl LogflareFdw { const FDW_NAME: &str = "LogflareFdw"; const BASE_URL: &str = "https://api.logflare.app/api/endpoints/query/"; - fn build_url(&self, endpoint: &str) -> Option { - let mut url = self.base_url.join(endpoint).unwrap(); + fn build_url(&self, endpoint: &str) -> LogflareFdwResult> { + let mut url = self.base_url.join(endpoint)?; for param in &self.params { // extract actual param name, e.g. "_param_foo" => "foo" let param_name = ¶m.field[7..]; if param.operator != "=" { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("Parameter '{}' only supports '=' operator", param_name), - ); - return None; + return Err(LogflareFdwError::NoEqualParameter(param_name.to_string())); } + match ¶m.value { Value::Cell(cell) => { let value = match cell { @@ -165,28 +124,31 @@ impl LogflareFdw { url.query_pairs_mut().append_pair(param_name, &value); } Value::Array(_) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("Parameter '{}' doesn't supports array value", param_name), - ); - return None; + return Err(LogflareFdwError::NoArrayParameter(param_name.to_string())) } } } - Some(url) + + Ok(Some(url)) } - fn resp_to_rows(&mut self, body: &JsonValue, tgt_cols: &[Column]) -> Option> { + fn resp_to_rows( + &mut self, + body: &JsonValue, + tgt_cols: &[Column], + ) -> LogflareFdwResult> { body.as_object() .and_then(|v| v.get("result")) .and_then(|v| v.as_array()) - .map(|arr| { + .ok_or(LogflareFdwError::InvalidResponse(body.to_string())) + .and_then(|arr| { arr.iter() .map(|record| { let mut row = Row::new(); + if let Some(r) = record.as_object() { for tgt_col in tgt_cols { - let cell = if tgt_col.name == "_result" { + let cell: Option = if tgt_col.name == "_result" { // add _result meta cell Some(Cell::String(record.to_string())) } else if tgt_col.name.starts_with("_param_") { @@ -204,37 +166,28 @@ impl LogflareFdw { }) } else { // add normal cell - r.get(&tgt_col.name).map(|v| json_value_to_cell(tgt_col, v)) + if let Some(s) = r.get(&tgt_col.name) { + match json_value_to_cell(tgt_col, s) { + Ok(cell) => Some(cell), + Err(err) => return Err(err), + } + } else { + None + } }; row.push(&tgt_col.name, cell); } } - row + + Ok(row) }) - .collect::>() + .collect() }) } } -#[derive(Error, Debug)] -enum LogflareFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), - #[error("{0}")] - OptionsError(#[from] OptionsError), -} - -impl From for ErrorReport { - fn from(value: LogflareFdwError) -> Self { - match value { - LogflareFdwError::CreateRuntimeError(e) => e.into(), - LogflareFdwError::OptionsError(e) => e.into(), - } - } -} - impl ForeignDataWrapper for LogflareFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> LogflareFdwResult { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -250,7 +203,7 @@ impl ForeignDataWrapper for LogflareFdw { Some(api_key) => Some(create_client(api_key)), None => { let key_id = require_option("api_key_id", options)?; - get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + get_vault_secret(key_id).map(|api_key| create_client(&api_key)) } }; @@ -258,9 +211,9 @@ impl ForeignDataWrapper for LogflareFdw { Ok(LogflareFdw { rt: create_async_runtime()?, - base_url: Url::parse(&base_url).unwrap(), + base_url: Url::parse(&base_url)?, client, - scan_result: None, + scan_result: Vec::default(), params: Vec::default(), }) } @@ -272,7 +225,7 @@ impl ForeignDataWrapper for LogflareFdw { _sorts: &[Sort], _limit: &Option, options: &HashMap, - ) -> Result<(), LogflareFdwError> { + ) -> LogflareFdwResult<()> { let endpoint = require_option("endpoint", options)?; // extract params @@ -284,76 +237,65 @@ impl ForeignDataWrapper for LogflareFdw { if let Some(client) = &self.client { // build url - let url = self.build_url(&endpoint); + let url = self.build_url(endpoint)?; if url.is_none() { return Ok(()); } let url = url.unwrap(); // make api call - match self.rt.block_on(client.get(url).send()) { - Ok(resp) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::BytesIn, - resp.content_length().unwrap_or(0) as i64, - ); - - if resp.status() == StatusCode::NOT_FOUND { - // if it is 404 error, we should treat it as an empty - // result rather than a request error - return Ok(()); - } + let body: JsonValue = self.rt.block_on(client.get(url).send()).and_then(|resp| { + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::BytesIn, + resp.content_length().unwrap_or(0) as i64, + ); - match resp.error_for_status() { - Ok(resp) => { - let body: JsonValue = self.rt.block_on(resp.json()).unwrap(); - let result = self.resp_to_rows(&body, columns); - if let Some(result) = &result { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::RowsIn, - result.len() as i64, - ); - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::RowsOut, - result.len() as i64, - ); - } - self.scan_result = result; - } - Err(err) => report_request_error!(err), - } + if resp.status() == StatusCode::NOT_FOUND { + // if it is 404 error, we should treat it as an empty + // result rather than a request error + return Ok(JsonValue::Null); } - Err(err) => report_request_error!(err), + + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.json())) + .map_err(reqwest_middleware::Error::from) + })?; + if body.is_null() { + return Ok(()); + } + + let result = self.resp_to_rows(&body, columns)?; + if !result.is_empty() { + stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsIn, result.len() as i64); + stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsOut, result.len() as i64); } + self.scan_result = result; } Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, LogflareFdwError> { - if let Some(ref mut result) = self.scan_result { - if !result.is_empty() { - return Ok(result - .drain(0..1) - .last() - .map(|src_row| row.replace_with(src_row))); - } + fn iter_scan(&mut self, row: &mut Row) -> LogflareFdwResult> { + if self.scan_result.is_empty() { + Ok(None) + } else { + Ok(self + .scan_result + .drain(0..1) + .last() + .map(|src_row| row.replace_with(src_row))) } - Ok(None) } - fn end_scan(&mut self) -> Result<(), LogflareFdwError> { - self.scan_result.take(); + fn end_scan(&mut self) -> LogflareFdwResult<()> { Ok(()) } fn validator( options: Vec>, catalog: Option, - ) -> Result<(), LogflareFdwError> { + ) -> LogflareFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "endpoint")?; diff --git a/wrappers/src/fdw/logflare_fdw/mod.rs b/wrappers/src/fdw/logflare_fdw/mod.rs index 7739c761..69eedc8a 100644 --- a/wrappers/src/fdw/logflare_fdw/mod.rs +++ b/wrappers/src/fdw/logflare_fdw/mod.rs @@ -1,2 +1,49 @@ #![allow(clippy::module_inception)] mod logflare_fdw; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; + +#[derive(Error, Debug)] +enum LogflareFdwError { + #[error("parameter '{0}' only supports '=' operator")] + NoEqualParameter(String), + + #[error("parameter '{0}' doesn't supports array value")] + NoArrayParameter(String), + + #[error("column '{0}' data type is not supported")] + UnsupportedColumnType(String), + + #[error("column '{0}' data type not match")] + ColumnTypeNotMatch(String), + + #[error("invalid Logflare response: {0}")] + InvalidResponse(String), + + #[error("{0}")] + OptionsError(#[from] OptionsError), + + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("parse url failed: {0}")] + UrlParseError(#[from] url::ParseError), + + #[error("request failed: {0}")] + RequestError(#[from] reqwest_middleware::Error), + + #[error("parse JSON response failed: {0}")] + JsonParseError(#[from] serde_json::Error), +} + +impl From for ErrorReport { + fn from(value: LogflareFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type LogflareFdwResult = Result;