From 9749b9bc105925801541d7f2b4251edeb46d06e9 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Fri, 22 Sep 2023 07:15:30 +0800 Subject: [PATCH] chore(clickhouse_fdw): refactor error reporting --- .../src/fdw/clickhouse_fdw/clickhouse_fdw.rs | 274 +++++++----------- wrappers/src/fdw/clickhouse_fdw/mod.rs | 38 +++ 2 files changed, 148 insertions(+), 164 deletions(-) diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 3417c478..74cecf3b 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -2,77 +2,73 @@ use crate::stats; use chrono::{Date, DateTime, NaiveDate, NaiveDateTime, Utc}; use chrono_tz::Tz; use clickhouse_rs::{types, types::Block, types::SqlType, ClientHandle, Pool}; -use pgrx::pg_sys::panic::ErrorReport; -use pgrx::{prelude::PgSqlErrorCode, to_timestamp}; +use pgrx::to_timestamp; use regex::{Captures, Regex}; use std::collections::HashMap; use supabase_wrappers::prelude::*; -use thiserror::Error; -fn field_to_cell(row: &types::Row, i: usize) -> Option { - let sql_type = row.sql_type(i).unwrap(); +use super::{ClickHouseFdwError, ClickHouseFdwResult}; + +fn field_to_cell(row: &types::Row, i: usize) -> ClickHouseFdwResult> { + let sql_type = row.sql_type(i)?; match sql_type { SqlType::UInt8 => { // Bool is stored as UInt8 in ClickHouse, so we treat it as bool here - let value = row.get::(i).unwrap(); - Some(Cell::Bool(value != 0)) + let value = row.get::(i)?; + Ok(Some(Cell::Bool(value != 0))) } SqlType::Int16 => { - let value = row.get::(i).unwrap(); - Some(Cell::I16(value)) + let value = row.get::(i)?; + Ok(Some(Cell::I16(value))) } SqlType::UInt16 => { - let value = row.get::(i).unwrap(); - Some(Cell::I32(value as i32)) + let value = row.get::(i)?; + Ok(Some(Cell::I32(value as i32))) } SqlType::Int32 => { - let value = row.get::(i).unwrap(); - Some(Cell::I32(value)) + let value = row.get::(i)?; + Ok(Some(Cell::I32(value))) } SqlType::UInt32 => { - let value = row.get::(i).unwrap(); - Some(Cell::I64(value as i64)) + let value = row.get::(i)?; + Ok(Some(Cell::I64(value as i64))) } SqlType::Float32 => { - let value = row.get::(i).unwrap(); - Some(Cell::F32(value)) + let value = row.get::(i)?; + Ok(Some(Cell::F32(value))) } SqlType::Float64 => { - let value = row.get::(i).unwrap(); - Some(Cell::F64(value)) + let value = row.get::(i)?; + Ok(Some(Cell::F64(value))) } SqlType::UInt64 => { - let value = row.get::(i).unwrap(); - Some(Cell::I64(value as i64)) + let value = row.get::(i)?; + Ok(Some(Cell::I64(value as i64))) } SqlType::Int64 => { - let value = row.get::(i).unwrap(); - Some(Cell::I64(value)) + let value = row.get::(i)?; + Ok(Some(Cell::I64(value))) } SqlType::String => { - let value = row.get::(i).unwrap(); - Some(Cell::String(value)) + let value = row.get::(i)?; + Ok(Some(Cell::String(value))) } SqlType::Date => { - let value = row.get::, usize>(i).unwrap(); + let value = row.get::, usize>(i)?; let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); let seconds_from_epoch = value.naive_utc().signed_duration_since(epoch).num_seconds(); let ts = to_timestamp(seconds_from_epoch as f64); - Some(Cell::Date(pgrx::Date::from(ts))) + Ok(Some(Cell::Date(pgrx::Date::from(ts)))) } SqlType::DateTime(_) => { - let value = row.get::, usize>(i).unwrap(); + let value = row.get::, usize>(i)?; let ts = to_timestamp(value.timestamp() as f64); - Some(Cell::Timestamp(ts.to_utc())) - } - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, - &format!("data type {} is not supported", sql_type.to_string()), - ); - None + Ok(Some(Cell::Timestamp(ts.to_utc()))) } + _ => Err(ClickHouseFdwError::UnsupportedColumnType( + sql_type.to_string().into(), + )), } } @@ -97,18 +93,27 @@ pub(crate) struct ClickHouseFdw { impl ClickHouseFdw { const FDW_NAME: &str = "ClickHouseFdw"; - fn create_client(&mut self) { + fn create_client(&mut self) -> ClickHouseFdwResult<()> { let pool = Pool::new(self.conn_str.as_str()); - self.client = self.rt.block_on(pool.get_handle()).map_or_else( - |err| { - report_error( - PgSqlErrorCode::ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION, - &format!("connection failed: {}", err), - ); - None - }, - Some, - ); + self.client = Some(self.rt.block_on(pool.get_handle())?); + Ok(()) + } + + fn replace_all_params( + &mut self, + re: &Regex, + mut replacement: impl FnMut(&Captures) -> ClickHouseFdwResult, + ) -> ClickHouseFdwResult { + let mut new = String::with_capacity(self.table.len()); + let mut last_match = 0; + for caps in re.captures_iter(&self.table) { + let m = caps.get(0).unwrap(); + new.push_str(&self.table[last_match..m.start()]); + new.push_str(&replacement(&caps)?); + last_match = m.end(); + } + new.push_str(&self.table[last_match..]); + Ok(new) } fn deparse( @@ -117,35 +122,31 @@ impl ClickHouseFdw { columns: &[Column], sorts: &[Sort], limit: &Option, - ) -> String { + ) -> ClickHouseFdwResult { let table = if self.table.starts_with('(') { let re = Regex::new(r"\$\{(\w+)\}").unwrap(); - re.replace_all(&self.table, |caps: &Captures| { + let mut params = Vec::new(); + let mut replacement = |caps: &Captures| -> ClickHouseFdwResult { let param = &caps[1]; - match quals.iter().find(|&q| q.field == param) { - Some(qual) => { - self.params.push(qual.clone()); + for qual in quals.iter() { + if qual.field == param { + params.push(qual.clone()); match &qual.value { - Value::Cell(cell) => cell.to_string(), - Value::Array(_) => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - "invalid query parameter", - ); - String::default() + Value::Cell(cell) => return Ok(cell.to_string()), + Value::Array(arr) => { + return Err(ClickHouseFdwError::NoArrayParameter(format!( + "{:?}", + arr + ))) } } } - None => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("unmatched query parameter: {}", param), - ); - String::default() - } } - }) - .into_owned() + Err(ClickHouseFdwError::UnmatchedParameter(param.to_owned())) + }; + let s = self.replace_all_params(&re, &mut replacement)?; + self.params = params; + s } else { self.table.clone() }; @@ -195,35 +196,18 @@ impl ClickHouseFdw { sql.push_str(&format!(" limit {}", real_limit)); } - sql - } -} - -#[derive(Error, Debug)] -enum ClickHouseFdwError { - #[error("{0}")] - CreateRuntimeError(#[from] CreateRuntimeError), - #[error("{0}")] - OptionsError(#[from] OptionsError), -} - -impl From for ErrorReport { - fn from(value: ClickHouseFdwError) -> Self { - match value { - ClickHouseFdwError::CreateRuntimeError(e) => e.into(), - ClickHouseFdwError::OptionsError(e) => e.into(), - } + Ok(sql) } } impl ForeignDataWrapper for ClickHouseFdw { - fn new(options: &HashMap) -> Result { + fn new(options: &HashMap) -> ClickHouseFdwResult { let rt = create_async_runtime()?; let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), None => { let conn_str_id = require_option("conn_string_id", options)?; - get_vault_secret(&conn_str_id).unwrap_or_default() + get_vault_secret(conn_str_id).unwrap_or_default() } }; @@ -249,43 +233,36 @@ impl ForeignDataWrapper for ClickHouseFdw { sorts: &[Sort], limit: &Option, options: &HashMap, - ) -> Result<(), ClickHouseFdwError> { - self.create_client(); + ) -> ClickHouseFdwResult<()> { + self.create_client()?; self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); self.row_idx = 0; - let sql = self.deparse(quals, columns, sorts, limit); + let sql = self.deparse(quals, columns, sorts, limit)?; if let Some(ref mut client) = self.client { // for simplicity purpose, we fetch whole query result to local, // may need optimization in the future. - match self.rt.block_on(client.query(&sql).fetch_all()) { - Ok(block) => { - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::RowsIn, - block.row_count() as i64, - ); - stats::inc_stats( - Self::FDW_NAME, - stats::Metric::RowsOut, - block.row_count() as i64, - ); - self.scan_blk = Some(block); - } - Err(err) => report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("query failed: {}", err), - ), - } + let block = self.rt.block_on(client.query(&sql).fetch_all())?; + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::RowsIn, + block.row_count() as i64, + ); + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::RowsOut, + block.row_count() as i64, + ); + self.scan_blk = Some(block); } Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Result, ClickHouseFdwError> { + fn iter_scan(&mut self, row: &mut Row) -> ClickHouseFdwResult> { if let Some(block) = &self.scan_blk { let mut rows = block.rows(); @@ -304,7 +281,7 @@ impl ForeignDataWrapper for ClickHouseFdw { .enumerate() .find(|(_, c)| c.name() == tgt_col.name) .unwrap(); - let cell = field_to_cell(&src_row, i); + let cell = field_to_cell(&src_row, i)?; let col_name = src_row.name(i).unwrap(); if cell.as_ref().is_none() { return Ok(None); @@ -318,23 +295,20 @@ impl ForeignDataWrapper for ClickHouseFdw { Ok(None) } - fn end_scan(&mut self) -> Result<(), ClickHouseFdwError> { + fn end_scan(&mut self) -> ClickHouseFdwResult<()> { self.scan_blk.take(); Ok(()) } - fn begin_modify( - &mut self, - options: &HashMap, - ) -> Result<(), ClickHouseFdwError> { - self.create_client(); + fn begin_modify(&mut self, options: &HashMap) -> ClickHouseFdwResult<()> { + self.create_client()?; self.table = require_option("table", options)?.to_string(); self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } - fn insert(&mut self, src: &Row) -> Result<(), ClickHouseFdwError> { + fn insert(&mut self, src: &Row) -> ClickHouseFdwResult<()> { if let Some(ref mut client) = self.client { let mut row = Vec::new(); for (col_name, cell) in src.iter() { @@ -347,52 +321,34 @@ impl ForeignDataWrapper for ClickHouseFdw { Cell::String(v) => row.push((col_name, types::Value::from(v.as_str()))), Cell::Date(_) => { let s = cell.to_string().replace('\'', ""); - if let Ok(tm) = NaiveDate::parse_from_str(&s, "%Y-%m-%d") { - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - let duration = tm - epoch; - let dt = types::Value::Date(duration.num_days() as u16, Tz::UTC); - row.push((col_name, dt)); - } else { - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, - &format!("invalid date format {}", s), - ); - } + let tm = NaiveDate::parse_from_str(&s, "%Y-%m-%d")?; + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let duration = tm - epoch; + let dt = types::Value::Date(duration.num_days() as u16, Tz::UTC); + row.push((col_name, dt)); } Cell::Timestamp(_) => { let s = cell.to_string().replace('\'', ""); - if let Ok(tm) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") { - let tm: DateTime = DateTime::from_utc(tm, Utc); - row.push((col_name, types::Value::from(tm))); - } else { - report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, - &format!("invalid timestamp format {}", s), - ); - } + let tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?; + let tm: DateTime = DateTime::from_utc(tm, Utc); + row.push((col_name, types::Value::from(tm))); + } + _ => { + return Err(ClickHouseFdwError::UnsupportedColumnType(cell.to_string())) } - _ => report_error( - PgSqlErrorCode::ERRCODE_FDW_INVALID_DATA_TYPE, - &format!("field type {:?} not supported", cell), - ), } } } let mut block = Block::new(); - block.push(row).unwrap(); + block.push(row)?; // execute query on ClickHouse - if let Err(err) = self.rt.block_on(client.insert(&self.table, block)) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("insert failed: {}", err), - ); - } + self.rt.block_on(client.insert(&self.table, block))?; } Ok(()) } - fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), ClickHouseFdwError> { + fn update(&mut self, rowid: &Cell, new_row: &Row) -> ClickHouseFdwResult<()> { if let Some(ref mut client) = self.client { let mut sets = Vec::new(); for (col, cell) in new_row.iter() { @@ -414,17 +370,12 @@ impl ForeignDataWrapper for ClickHouseFdw { ); // execute query on ClickHouse - if let Err(err) = self.rt.block_on(client.execute(&sql)) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("update failed: {}", err), - ); - } + self.rt.block_on(client.execute(&sql))?; } Ok(()) } - fn delete(&mut self, rowid: &Cell) -> Result<(), ClickHouseFdwError> { + fn delete(&mut self, rowid: &Cell) -> ClickHouseFdwResult<()> { if let Some(ref mut client) = self.client { let sql = format!( "alter table {} delete where {} = {}", @@ -432,12 +383,7 @@ impl ForeignDataWrapper for ClickHouseFdw { ); // execute query on ClickHouse - if let Err(err) = self.rt.block_on(client.execute(&sql)) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("delete failed: {}", err), - ); - } + self.rt.block_on(client.execute(&sql))?; } Ok(()) } diff --git a/wrappers/src/fdw/clickhouse_fdw/mod.rs b/wrappers/src/fdw/clickhouse_fdw/mod.rs index c51b5eea..c58c6764 100644 --- a/wrappers/src/fdw/clickhouse_fdw/mod.rs +++ b/wrappers/src/fdw/clickhouse_fdw/mod.rs @@ -1,3 +1,41 @@ #![allow(clippy::module_inception)] mod clickhouse_fdw; mod tests; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; + +#[derive(Error, Debug)] +enum ClickHouseFdwError { + #[error("parameter '{0}' doesn't supports array value")] + NoArrayParameter(String), + + #[error("unmatched query parameter: {0}")] + UnmatchedParameter(String), + + #[error("column data type '{0}' is not supported")] + UnsupportedColumnType(String), + + #[error("datetime parse error: {0}")] + DatetimeParseError(#[from] chrono::format::ParseError), + + #[error("{0}")] + OptionsError(#[from] OptionsError), + + #[error("{0}")] + CreateRuntimeError(#[from] CreateRuntimeError), + + #[error("{0}")] + ClickHouseError(#[from] clickhouse_rs::errors::Error), +} + +impl From for ErrorReport { + fn from(value: ClickHouseFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type ClickHouseFdwResult = Result;