Skip to content

Commit

Permalink
chore(s3_fdw): refactor error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
burmecia committed Sep 18, 2023
1 parent 07db254 commit f0416ce
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 205 deletions.
2 changes: 2 additions & 0 deletions wrappers/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"]
stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"]
firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "thiserror"]
s3_fdw = [
"reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3",
"reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "aws-smithy-http", "aws-smithy-runtime-api",
"tokio", "tokio-util", "csv", "async-compression", "serde_json",
"http", "parquet", "futures", "arrow-array", "chrono", "thiserror"
]
Expand Down Expand Up @@ -64,6 +64,8 @@ url = { version = "2.3", optional = true }
# for s3_fdw
aws-config = { version = "0.56.1", optional = true }
aws-sdk-s3 = { version = "0.30.0", optional = true }
aws-smithy-http = { version = "0.56.1", optional = true }
aws-smithy-runtime-api = { version = "0.56.1", optional = true }
csv = { version = "1.2", optional = true }
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", optional = true }
Expand Down
62 changes: 62 additions & 0 deletions wrappers/src/fdw/s3_fdw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,65 @@
mod parquet;
mod s3_fdw;
mod tests;

use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_smithy_http::result::SdkError;
use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::PgSqlErrorCode;
use thiserror::Error;

use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError};

#[derive(Error, Debug)]
enum S3FdwError {
#[error("invalid s3 uri: {0}")]
InvalidS3Uri(String),

#[error("invalid format option: '{0}', it can only be 'csv', 'jsonl' or 'parquet'")]
InvalidFormatOption(String),

#[error("invalid compression option: {0}")]
InvalidCompressOption(String),

#[error("read line failed: {0}")]
ReadLineError(#[from] std::io::Error),

#[error("read csv record failed: {0}")]
ReadCsvError(#[from] csv::Error),

#[error("read parquet failed: {0}")]
ReadParquetError(#[from] ::parquet::errors::ParquetError),

#[error("column '{0}' data type is not supported")]
UnsupportedColumnType(String),

#[error("column '{0}' data type not match")]
ColumnTypeNotMatch(String),

#[error("column {0} not found in parquet file")]
ColumnNotFound(String),

#[error("{0}")]
OptionsError(#[from] OptionsError),

#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),

#[error("parse uri failed: {0}")]
UriParseError(#[from] http::uri::InvalidUri),

#[error("request failed: {0}")]
RequestError(#[from] SdkError<GetObjectError, HttpResponse>),

#[error("parse JSON response failed: {0}")]
JsonParseError(#[from] serde_json::Error),
}

impl From<S3FdwError> for ErrorReport {
fn from(value: S3FdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "")
}
}

type S3FdwResult<T> = Result<T, S3FdwError>;
102 changes: 42 additions & 60 deletions wrappers/src/fdw/s3_fdw/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parquet::arrow::async_reader::{
use parquet::arrow::ProjectionMask;
use pgrx::datum::datetime_support::to_timestamp;
use pgrx::pg_sys;
use pgrx::prelude::{Date, PgSqlErrorCode};
use pgrx::prelude::Date;
use std::cmp::min;
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, SeekFrom};
use std::pin::Pin;
Expand All @@ -19,6 +19,8 @@ use tokio::runtime::Handle;

use supabase_wrappers::prelude::*;

use super::{S3FdwError, S3FdwResult};

// convert an error to IO error
#[inline]
fn to_io_error(err: impl std::error::Error) -> IoError {
Expand Down Expand Up @@ -143,11 +145,12 @@ impl S3Parquet {
const FDW_NAME: &str = "S3Fdw";

// open batch stream from local buffer
pub(super) async fn open_local_stream(&mut self, buf: Vec<u8>) {
pub(super) async fn open_local_stream(&mut self, buf: Vec<u8>) -> S3FdwResult<()> {
let cursor: Box<dyn AsyncFileReader> = Box::new(Cursor::new(buf));
let builder = ParquetRecordBatchStreamBuilder::new(cursor).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
let stream = builder.build().unwrap();
self.stream = Some(stream);
Ok(())
}

// open async record batch stream
Expand All @@ -160,7 +163,7 @@ impl S3Parquet {
bucket: &str,
object: &str,
tgt_cols: &[Column],
) {
) -> S3FdwResult<()> {
let handle = Handle::current();
let rdr = S3ParquetReader::new(client, bucket, object);

Expand Down Expand Up @@ -201,72 +204,65 @@ impl S3Parquet {
let mask = ProjectionMask::roots(schema, project_indexes);
builder.with_projection(mask).build()
})
.map_err(to_io_error)
.unwrap();
.map_err(|err| parquet::errors::ParquetError::General(err.to_string()))?;

self.stream = Some(stream);
self.batch = None;
self.batch_idx = 0;

Ok(())
}

// refill record batch
pub(super) async fn refill(&mut self) -> Option<()> {
pub(super) async fn refill(&mut self) -> S3FdwResult<Option<()>> {
// if there are still records in the batch
if let Some(batch) = &self.batch {
if self.batch_idx < batch.num_rows() {
return Some(());
return Ok(Some(()));
}
}

// otherwise, read one moe batch
if let Some(ref mut stream) = &mut self.stream {
match stream.try_next().await {
Ok(result) => {
return result.map(|batch| {
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::RowsIn,
batch.num_rows() as i64,
);
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::BytesIn,
batch.get_array_memory_size() as i64,
);
let result = stream.try_next().await?;
return Ok(result.map(|batch| {
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::RowsIn,
batch.num_rows() as i64,
);
stats::inc_stats(
Self::FDW_NAME,
stats::Metric::BytesIn,
batch.get_array_memory_size() as i64,
);

self.batch = Some(batch);
self.batch_idx = 0;
})
}
Err(err) => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("read parquet record batch failed: {}", err),
);
return None;
}
}
self.batch = Some(batch);
self.batch_idx = 0;
}));
}

None
Ok(None)
}

// read one row from record batch
pub(super) fn read_into_row(&mut self, row: &mut Row, tgt_cols: &Vec<Column>) -> Option<()> {
pub(super) fn read_into_row(
&mut self,
row: &mut Row,
tgt_cols: &Vec<Column>,
) -> S3FdwResult<Option<()>> {
if let Some(batch) = &self.batch {
for tgt_col in tgt_cols {
let col = batch
.column_by_name(&tgt_col.name)
.unwrap_or_else(|| panic!("column {} not found in parquet file", tgt_col.name));
.ok_or(S3FdwError::ColumnNotFound(tgt_col.name.clone()))?;

macro_rules! col_to_cell {
($array_type:ident, $cell_type:ident) => {{
let arr = col
.as_any()
.downcast_ref::<array::$array_type>()
.unwrap_or_else(|| {
panic!("column '{}' data type not match", tgt_col.name)
});
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
if arr.is_null(self.batch_idx) {
None
} else {
Expand All @@ -287,9 +283,7 @@ impl S3Parquet {
let arr = col
.as_any()
.downcast_ref::<array::Float64Array>()
.unwrap_or_else(|| {
panic!("column '{}' data type not match", tgt_col.name)
});
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
if arr.is_null(self.batch_idx) {
None
} else {
Expand All @@ -302,9 +296,7 @@ impl S3Parquet {
let arr = col
.as_any()
.downcast_ref::<array::BinaryArray>()
.unwrap_or_else(|| {
panic!("column '{}' data type not match", tgt_col.name)
});
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
if arr.is_null(self.batch_idx) {
None
} else {
Expand All @@ -316,9 +308,7 @@ impl S3Parquet {
let arr = col
.as_any()
.downcast_ref::<array::Date64Array>()
.unwrap_or_else(|| {
panic!("column '{}' data type not match", tgt_col.name)
});
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
if arr.is_null(self.batch_idx) {
None
} else {
Expand All @@ -335,9 +325,7 @@ impl S3Parquet {
let arr = col
.as_any()
.downcast_ref::<array::TimestampNanosecondArray>()
.unwrap_or_else(|| {
panic!("column '{}' data type not match", tgt_col.name)
});
.ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?;
if arr.is_null(self.batch_idx) {
None
} else {
Expand All @@ -347,19 +335,13 @@ impl S3Parquet {
})
}
}
_ => {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("column '{}' data type not supported", tgt_col.name),
);
None
}
_ => return Err(S3FdwError::UnsupportedColumnType(tgt_col.name.clone())),
};
row.push(&tgt_col.name, cell);
}
self.batch_idx += 1;
return Some(());
return Ok(Some(()));
}
None
Ok(None)
}
}
Loading

0 comments on commit f0416ce

Please sign in to comment.