Skip to content

Commit 75d54cd

Browse files
committed
Use DataFusionError instead of ArrowError in FileOpenFuture
1 parent 8d09a96 commit 75d54cd

File tree

3 files changed

+9
-17
lines changed

3 files changed

+9
-17
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use arrow::datatypes::SchemaRef;
1919
use arrow::{array::RecordBatch, compute::concat_batches};
2020
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
21+
use futures::StreamExt;
2122
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
2223
use datafusion_datasource::{
2324
file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
@@ -38,7 +39,6 @@ use datafusion_physical_plan::{
3839
metrics::ExecutionPlanMetricsSet,
3940
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
4041
};
41-
use futures::stream::BoxStream;
4242
use futures::{FutureExt, Stream};
4343
use object_store::ObjectStore;
4444
use std::{
@@ -93,11 +93,7 @@ impl FileOpener for TestOpener {
9393
let stream = TestStream::new(batches);
9494

9595
Ok((async {
96-
let stream: BoxStream<
97-
'static,
98-
Result<RecordBatch, datafusion_common::DataFusionError>,
99-
> = Box::pin(stream.map(|r| r.map_err(Into::into)));
100-
Ok(stream)
96+
Ok(stream.boxed())
10197
})
10298
.boxed())
10399
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,8 @@ impl FileOpener for ParquetOpener {
411411
.build()?;
412412

413413
let stream = stream
414-
.map_err(|e| DataFusionError::from(e))
415-
.map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?)));
414+
.map_err(DataFusionError::from)
415+
.map(move |b| b.and_then(|b| schema_mapping.map_batch(b)));
416416

417417
if let Some(file_pruner) = file_pruner {
418418
Ok(EarlyStoppingStream::new(

datafusion/datasource/src/file_stream.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use datafusion_physical_plan::metrics::{
3737
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
3838
};
3939

40-
use arrow::error::ArrowError;
4140
use arrow::record_batch::RecordBatch;
4241
use datafusion_common::instant::Instant;
4342
use datafusion_common::{DataFusionError, ScalarValue};
@@ -225,7 +224,6 @@ impl FileStream {
225224
let result = self
226225
.pc_projector
227226
.project(batch, partition_values)
228-
.map_err(|e| ArrowError::ExternalError(e.into()))
229227
.map(|batch| match &mut self.remain {
230228
Some(remain) => {
231229
if *remain > batch.num_rows() {
@@ -247,7 +245,7 @@ impl FileStream {
247245
self.state = FileStreamState::Error
248246
}
249247
self.file_stream_metrics.time_scanning_total.start();
250-
return Poll::Ready(Some(result.map_err(Into::into)));
248+
return Poll::Ready(Some(result));
251249
}
252250
Some(Err(err)) => {
253251
self.file_stream_metrics.file_scan_errors.add(1);
@@ -281,7 +279,7 @@ impl FileStream {
281279
},
282280
OnError::Fail => {
283281
self.state = FileStreamState::Error;
284-
return Poll::Ready(Some(Err(err.into())));
282+
return Poll::Ready(Some(Err(err)));
285283
}
286284
}
287285
}
@@ -526,7 +524,6 @@ mod tests {
526524
use crate::file_scan_config::FileScanConfigBuilder;
527525
use crate::tests::make_partition;
528526
use crate::PartitionedFile;
529-
use arrow::error::ArrowError;
530527
use datafusion_common::error::Result;
531528
use datafusion_execution::object_store::ObjectStoreUrl;
532529
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -540,7 +537,7 @@ mod tests {
540537
use arrow::array::RecordBatch;
541538
use arrow::datatypes::Schema;
542539

543-
use datafusion_common::{assert_batches_eq, internal_err};
540+
use datafusion_common::{assert_batches_eq, internal_err, DataFusionError};
544541

545542
/// Test `FileOpener` which will simulate errors during file opening or scanning
546543
#[derive(Default)]
@@ -566,10 +563,9 @@ mod tests {
566563
if self.error_opening_idx.contains(&idx) {
567564
Ok(futures::future::ready(internal_err!("error opening")).boxed())
568565
} else if self.error_scanning_idx.contains(&idx) {
569-
let error = futures::future::ready(Err(ArrowError::IpcError(
566+
let error = futures::future::ready(Err(DataFusionError::Execution(
570567
"error scanning".to_owned(),
571-
)
572-
.into()));
568+
)));
573569
let stream = futures::stream::once(error).boxed();
574570
Ok(futures::future::ready(Ok(stream)).boxed())
575571
} else {

0 commit comments

Comments
 (0)