Skip to content

Commit 3d5863b

Browse files
authored
Use DataFusionError instead of ArrowError in FileOpenFuture (#17397)
1 parent 4dc1a4f commit 3d5863b

File tree

8 files changed

+69
-59
lines changed

8 files changed

+69
-59
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
2626
use arrow::buffer::Buffer;
2727
use arrow::datatypes::SchemaRef;
2828
use arrow_ipc::reader::FileDecoder;
29-
use datafusion_common::Statistics;
29+
use datafusion_common::{exec_datafusion_err, Statistics};
3030
use datafusion_datasource::file::FileSource;
3131
use datafusion_datasource::file_scan_config::FileScanConfig;
3232
use datafusion_datasource::PartitionedFile;
@@ -140,15 +140,19 @@ impl FileOpener for ArrowOpener {
140140
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
141141
file, projection,
142142
)?;
143-
Ok(futures::stream::iter(arrow_reader).boxed())
143+
Ok(futures::stream::iter(arrow_reader)
144+
.map(|r| r.map_err(Into::into))
145+
.boxed())
144146
}
145147
GetResultPayload::Stream(_) => {
146148
let bytes = r.bytes().await?;
147149
let cursor = std::io::Cursor::new(bytes);
148150
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
149151
cursor, projection,
150152
)?;
151-
Ok(futures::stream::iter(arrow_reader).boxed())
153+
Ok(futures::stream::iter(arrow_reader)
154+
.map(|r| r.map_err(Into::into))
155+
.boxed())
152156
}
153157
}
154158
}
@@ -179,9 +183,7 @@ impl FileOpener for ArrowOpener {
179183
footer_buf[..footer_len].try_into().unwrap(),
180184
)
181185
.map_err(|err| {
182-
arrow::error::ArrowError::ParseError(format!(
183-
"Unable to get root as footer: {err:?}"
184-
))
186+
exec_datafusion_err!("Unable to get root as footer: {err:?}")
185187
})?;
186188
// build decoder according to footer & projection
187189
let schema =
@@ -248,6 +250,7 @@ impl FileOpener for ArrowOpener {
248250
.transpose()
249251
}),
250252
)
253+
.map(|r| r.map_err(Into::into))
251254
.boxed())
252255
}
253256
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use arrow::datatypes::SchemaRef;
19-
use arrow::error::ArrowError;
2019
use arrow::{array::RecordBatch, compute::concat_batches};
2120
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
2221
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
@@ -39,7 +38,7 @@ use datafusion_physical_plan::{
3938
metrics::ExecutionPlanMetricsSet,
4039
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
4140
};
42-
use futures::stream::BoxStream;
41+
use futures::StreamExt;
4342
use futures::{FutureExt, Stream};
4443
use object_store::ObjectStore;
4544
use std::{
@@ -93,12 +92,7 @@ impl FileOpener for TestOpener {
9392

9493
let stream = TestStream::new(batches);
9594

96-
Ok((async {
97-
let stream: BoxStream<'static, Result<RecordBatch, ArrowError>> =
98-
Box::pin(stream);
99-
Ok(stream)
100-
})
101-
.boxed())
95+
Ok((async { Ok(stream.boxed()) }).boxed())
10296
}
10397
}
10498

@@ -344,7 +338,7 @@ impl TestStream {
344338
}
345339

346340
impl Stream for TestStream {
347-
type Item = Result<RecordBatch, ArrowError>;
341+
type Item = Result<RecordBatch>;
348342

349343
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
350344
let next_batch = self.index.value();

datafusion/datasource-avro/src/source.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,16 @@ mod private {
169169
match r.payload {
170170
GetResultPayload::File(file, _) => {
171171
let reader = config.open(file)?;
172-
Ok(futures::stream::iter(reader).boxed())
172+
Ok(futures::stream::iter(reader)
173+
.map(|r| r.map_err(Into::into))
174+
.boxed())
173175
}
174176
GetResultPayload::Stream(_) => {
175177
let bytes = r.bytes().await?;
176178
let reader = config.open(bytes.reader())?;
177-
Ok(futures::stream::iter(reader).boxed())
179+
Ok(futures::stream::iter(reader)
180+
.map(|r| r.map_err(Into::into))
181+
.boxed())
178182
}
179183
}
180184
}))

datafusion/datasource-csv/src/source.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,17 +392,20 @@ impl FileOpener for CsvOpener {
392392
)?
393393
};
394394

395-
Ok(futures::stream::iter(config.open(decoder)?).boxed())
395+
Ok(futures::stream::iter(config.open(decoder)?)
396+
.map(|r| r.map_err(Into::into))
397+
.boxed())
396398
}
397399
GetResultPayload::Stream(s) => {
398400
let decoder = config.builder().build_decoder();
399401
let s = s.map_err(DataFusionError::from);
400402
let input = file_compression_type.convert_stream(s.boxed())?.fuse();
401403

402-
Ok(deserialize_stream(
404+
let stream = deserialize_stream(
403405
input,
404406
DecoderDeserializer::new(CsvDecoder::new(decoder)),
405-
))
407+
);
408+
Ok(stream.map_err(Into::into).boxed())
406409
}
407410
}
408411
}))

datafusion/datasource-json/src/source.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ impl FileOpener for JsonOpener {
222222
.with_batch_size(batch_size)
223223
.build(BufReader::new(bytes))?;
224224

225-
Ok(futures::stream::iter(reader).boxed())
225+
Ok(futures::stream::iter(reader)
226+
.map(|r| r.map_err(Into::into))
227+
.boxed())
226228
}
227229
GetResultPayload::Stream(s) => {
228230
let s = s.map_err(DataFusionError::from);
@@ -232,10 +234,11 @@ impl FileOpener for JsonOpener {
232234
.build_decoder()?;
233235
let input = file_compression_type.convert_stream(s.boxed())?.fuse();
234236

235-
Ok(deserialize_stream(
237+
let stream = deserialize_stream(
236238
input,
237239
DecoderDeserializer::new(JsonDecoder::new(decoder)),
238-
))
240+
);
241+
Ok(stream.map_err(Into::into).boxed())
239242
}
240243
}
241244
}))

datafusion/datasource-parquet/src/opener.rs

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use std::sync::Arc;
3232
use std::task::{Context, Poll};
3333

3434
use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
35-
use arrow::error::ArrowError;
3635
use datafusion_common::encryption::FileDecryptionProperties;
3736

3837
use datafusion_common::{exec_err, DataFusionError, Result};
@@ -414,8 +413,8 @@ impl FileOpener for ParquetOpener {
414413
.build()?;
415414

416415
let stream = stream
417-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
418-
.map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?)));
416+
.map_err(DataFusionError::from)
417+
.map(move |b| b.and_then(|b| schema_mapping.map_batch(b)));
419418

420419
if let Some(file_pruner) = file_pruner {
421420
Ok(EarlyStoppingStream::new(
@@ -462,12 +461,9 @@ impl<S> EarlyStoppingStream<S> {
462461
}
463462
impl<S> EarlyStoppingStream<S>
464463
where
465-
S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
464+
S: Stream<Item = Result<RecordBatch>> + Unpin,
466465
{
467-
fn check_prune(
468-
&mut self,
469-
input: Result<RecordBatch, ArrowError>,
470-
) -> Result<Option<RecordBatch>, ArrowError> {
466+
fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
471467
let batch = input?;
472468

473469
// Since dynamic filters may have been updated, see if we can stop
@@ -485,9 +481,9 @@ where
485481

486482
impl<S> Stream for EarlyStoppingStream<S>
487483
where
488-
S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
484+
S: Stream<Item = Result<RecordBatch>> + Unpin,
489485
{
490-
type Item = Result<RecordBatch, ArrowError>;
486+
type Item = Result<RecordBatch>;
491487

492488
fn poll_next(
493489
mut self: Pin<&mut Self>,
@@ -697,8 +693,8 @@ mod test {
697693
use bytes::{BufMut, BytesMut};
698694
use chrono::Utc;
699695
use datafusion_common::{
700-
assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, ScalarValue,
701-
Statistics,
696+
assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
697+
DataFusionError, ScalarValue, Statistics,
702698
};
703699
use datafusion_datasource::{
704700
file_meta::FileMeta,
@@ -724,12 +720,8 @@ mod test {
724720
async fn count_batches_and_rows(
725721
mut stream: std::pin::Pin<
726722
Box<
727-
dyn Stream<
728-
Item = Result<
729-
arrow::array::RecordBatch,
730-
arrow::error::ArrowError,
731-
>,
732-
> + Send,
723+
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
724+
+ Send,
733725
>,
734726
>,
735727
) -> (usize, usize) {
@@ -745,12 +737,8 @@ mod test {
745737
async fn collect_batches(
746738
mut stream: std::pin::Pin<
747739
Box<
748-
dyn Stream<
749-
Item = Result<
750-
arrow::array::RecordBatch,
751-
arrow::error::ArrowError,
752-
>,
753-
> + Send,
740+
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
741+
+ Send,
754742
>,
755743
>,
756744
) -> Vec<arrow::array::RecordBatch> {

datafusion/datasource/src/file_stream.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use datafusion_physical_plan::metrics::{
3737
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
3838
};
3939

40-
use arrow::error::ArrowError;
4140
use arrow::record_batch::RecordBatch;
4241
use datafusion_common::instant::Instant;
4342
use datafusion_common::ScalarValue;
@@ -225,7 +224,6 @@ impl FileStream {
225224
let result = self
226225
.pc_projector
227226
.project(batch, partition_values)
228-
.map_err(|e| ArrowError::ExternalError(e.into()))
229227
.map(|batch| match &mut self.remain {
230228
Some(remain) => {
231229
if *remain > batch.num_rows() {
@@ -247,7 +245,7 @@ impl FileStream {
247245
self.state = FileStreamState::Error
248246
}
249247
self.file_stream_metrics.time_scanning_total.start();
250-
return Poll::Ready(Some(result.map_err(Into::into)));
248+
return Poll::Ready(Some(result));
251249
}
252250
Some(Err(err)) => {
253251
self.file_stream_metrics.file_scan_errors.add(1);
@@ -281,7 +279,7 @@ impl FileStream {
281279
},
282280
OnError::Fail => {
283281
self.state = FileStreamState::Error;
284-
return Poll::Ready(Some(Err(err.into())));
282+
return Poll::Ready(Some(Err(err)));
285283
}
286284
}
287285
}
@@ -345,7 +343,7 @@ impl RecordBatchStream for FileStream {
345343

346344
/// A fallible future that resolves to a stream of [`RecordBatch`]
347345
pub type FileOpenFuture =
348-
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
346+
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
349347

350348
/// Describes the behavior of the `FileStream` if file opening or scanning fails
351349
pub enum OnError {
@@ -376,7 +374,7 @@ pub trait FileOpener: Unpin + Send + Sync {
376374
/// is ready
377375
pub enum NextOpen {
378376
Pending(FileOpenFuture),
379-
Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
377+
Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
380378
}
381379

382380
pub enum FileStreamState {
@@ -396,7 +394,7 @@ pub enum FileStreamState {
396394
/// Partitioning column values for the current batch_iter
397395
partition_values: Vec<ScalarValue>,
398396
/// The reader instance
399-
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
397+
reader: BoxStream<'static, Result<RecordBatch>>,
400398
/// A [`FileOpenFuture`] for the next file to be processed,
401399
/// and its corresponding partition column values, if any.
402400
/// This allows the next file to be opened in parallel while the
@@ -526,7 +524,6 @@ mod tests {
526524
use crate::file_scan_config::FileScanConfigBuilder;
527525
use crate::tests::make_partition;
528526
use crate::PartitionedFile;
529-
use arrow::error::ArrowError;
530527
use datafusion_common::error::Result;
531528
use datafusion_execution::object_store::ObjectStoreUrl;
532529
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -540,7 +537,7 @@ mod tests {
540537
use arrow::array::RecordBatch;
541538
use arrow::datatypes::Schema;
542539

543-
use datafusion_common::{assert_batches_eq, internal_err};
540+
use datafusion_common::{assert_batches_eq, exec_err, internal_err};
544541

545542
/// Test `FileOpener` which will simulate errors during file opening or scanning
546543
#[derive(Default)]
@@ -566,9 +563,7 @@ mod tests {
566563
if self.error_opening_idx.contains(&idx) {
567564
Ok(futures::future::ready(internal_err!("error opening")).boxed())
568565
} else if self.error_scanning_idx.contains(&idx) {
569-
let error = futures::future::ready(Err(ArrowError::IpcError(
570-
"error scanning".to_owned(),
571-
)));
566+
let error = futures::future::ready(exec_err!("error scanning"));
572567
let stream = futures::stream::once(error).boxed();
573568
Ok(futures::future::ready(Ok(stream)).boxed())
574569
} else {

docs/source/library-user-guide/upgrading.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,26 @@ Reimplementation for any custom `DataSource` should be relatively straightforwar
265265

266266
[#17395]: https://github.com/apache/datafusion/pull/17395/
267267

268+
### `FileOpenFuture` now uses `DataFusionError` instead of `ArrowError`
269+
270+
The `FileOpenFuture` type alias has been updated to use `DataFusionError` instead of `ArrowError` for its error type. This change affects the `FileOpener` trait and any implementations that work with file streaming operations.
271+
272+
**Before:**
273+
274+
```rust,ignore
275+
pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
276+
```
277+
278+
**After:**
279+
280+
```rust,ignore
281+
pub type FileOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
282+
```
283+
284+
If you have custom implementations of `FileOpener` or work directly with `FileOpenFuture`, you'll need to update your error handling to use `DataFusionError` instead of `ArrowError`. The `FileStreamState` enum's `Open` variant has also been updated accordingly. See [#17397] for more details.
285+
286+
[#17397]: https://github.com/apache/datafusion/pull/17397
287+
268288
## DataFusion `49.0.0`
269289

270290
### `MSRV` updated to 1.85.1

0 commit comments

Comments
 (0)