Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 72 additions & 25 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,20 @@ impl ExecutionPlan for ParquetExec {
&projection,
&pruning_predicate,
batch_size,
response_tx,
response_tx.clone(),
limit,
partition_col_proj,
) {
println!(
"Parquet reader thread terminated due to error: {:?} for files: {:?}",
e, partition
);
// Send the error back to the main thread.
//
// Ignore error sending (via `.ok()`) because that
// means the receiver has been torn down (and nothing
// cares about the errors anymore)
send_result(&response_tx, Err(e.into())).ok();
}
});

Expand Down Expand Up @@ -514,11 +520,14 @@ fn read_partition(
#[cfg(test)]
mod tests {
use crate::{
assert_batches_sorted_eq,
assert_batches_sorted_eq, assert_contains,
datasource::{
file_format::{parquet::ParquetFormat, FileFormat},
object_store::local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
object_store::{
local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
},
FileMeta, SizedFile,
},
},
physical_plan::collect,
Expand Down Expand Up @@ -547,7 +556,7 @@ mod tests {
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
) -> Vec<RecordBatch> {
) -> Result<Vec<RecordBatch>> {
// When vec is dropped, temp files are deleted
let files: Vec<_> = batches
.into_iter()
Expand Down Expand Up @@ -602,9 +611,7 @@ mod tests {
);

let runtime = Arc::new(RuntimeEnv::default());
collect(Arc::new(parquet_exec), runtime)
.await
.expect("reading parquet data")
collect(Arc::new(parquet_exec), runtime).await
}

// Add a new column with the specified field name to the RecordBatch
Expand Down Expand Up @@ -649,7 +656,9 @@ mod tests {
let batch3 = add_to_batch(&batch1, "c3", c3);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await;
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
Expand Down Expand Up @@ -688,7 +697,9 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await;
let read = round_trip_to_parquet(vec![batch1, batch2], None, None)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
Expand Down Expand Up @@ -720,7 +731,9 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await;
let read = round_trip_to_parquet(vec![batch1, batch2], None, None)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c3 | c2 |",
Expand Down Expand Up @@ -759,8 +772,9 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]);

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await;
let read = round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None)
.await
.unwrap();
let expected = vec![
"+-----+-----+",
"| c1 | c4 |",
Expand Down Expand Up @@ -808,18 +822,8 @@ mod tests {
let read =
round_trip_to_parquet(vec![batch1, batch2], None, Some(Arc::new(schema)))
.await;

// expect only the first batch to be read
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
"+-----+----+----+",
"| Foo | 1 | 10 |",
"| | 2 | 20 |",
"| bar | | |",
"+-----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
assert_contains!(read.unwrap_err().to_string(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should have always failed -- see #1622 (comment)

"Execution error: Failed to map column projection for field c3. Incompatible data types Float32 and Int8");
}

#[tokio::test]
Expand Down Expand Up @@ -922,6 +926,49 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_with_error() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
let partitioned_file = PartitionedFile {
file_meta: FileMeta {
sized_file: SizedFile {
size: 1337,
path: "invalid".into(),
},
last_modified: None,
},
partition_values: vec![],
};

let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![partitioned_file]],
file_schema: ParquetFormat::default()
.infer_schema(local_object_reader_stream(vec![filename]))
.await?,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
);

let mut results = parquet_exec.execute(0, runtime).await?;
let batch = results.next().await.unwrap();
// invalid file should produce an error to that effect
assert_contains!(
batch.unwrap_err().to_string(),
"External error: Parquet error: Arrow: IO error"
);
assert!(results.next().await.is_none());

Ok(())
}

fn parquet_file_metrics() -> ParquetFileMetrics {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
Expand Down