Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Sep 4, 2025

I would like to re-use ProjectionStream for #14993:

struct ProjectionStream {
schema: SchemaRef,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}
impl Stream for ProjectionStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Some(self.batch_project(&batch)),
other => other,
});
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
// Same number of record batches
self.input.size_hint()
}
}

A barrier to this is that the FileOpenerFuture stream uses ArrowError while the ExecutionPlan::execute stream uses DataFusionError. I don't see any reason for this so I propose we change it.

Within DataFusion we should not really be creating ArrowErrors, I believe we should almost always use DataFusionError.

Upgrade guide note aside this is a -3 LOC diff -> reducing complexity (less error types, less lines of code).

@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Sep 4, 2025
@adriangb adriangb requested a review from blaginin September 4, 2025 00:21
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Sep 4, 2025
)
.map_err(|err| {
arrow::error::ArrowError::ParseError(format!(
datafusion_common::DataFusionError::Execution(format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
datafusion_common::DataFusionError::Execution(format!(
exec_err!(

Ok(futures::future::ready(internal_err!("error opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(Err(ArrowError::IpcError(
let error = futures::future::ready(Err(DataFusionError::Execution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let error = futures::future::ready(Err(DataFusionError::Execution(
let error = futures::future::ready(exec_err!(

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result<RecordBatch, DataFusionError> can be replaced with pub type Result<T, E = DataFusionError> = result::Result<T, E>; declared in datafusion/common/src/error.rs

@comphead comphead added the api change Changes the API exposed to users of the crate label Sep 4, 2025
…ub type Result<T, E = DataFusionError> = result::Result<T, E>;` declared in `datafusion/common/src/error.rs`
@adriangb
Copy link
Contributor Author

adriangb commented Sep 4, 2025

Thanks @comphead ! Addressed your feedback

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing how rustc can distinguish Result implementations 😮


impl Stream for TestStream {
type Item = Result<RecordBatch, ArrowError>;
type Item = Result<RecordBatch, datafusion_common::DataFusionError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the Item also reuse DF Result?

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @adriangb

@adriangb
Copy link
Contributor Author

adriangb commented Sep 4, 2025

I also spoke with @alamb about this change, he agreed it's a positive change and that the breakage should be minimal because (1) not many people are implementing FileOpenFuture and (2) there is an impl From<ArrowError> for DataFusionError.

@adriangb adriangb merged commit 3d5863b into apache:main Sep 4, 2025
29 checks passed
@adriangb adriangb deleted the use-df-error branch September 4, 2025 18:24
destrex271 pushed a commit to destrex271/datafusion that referenced this pull request Sep 5, 2025
@alamb
Copy link
Contributor

alamb commented Sep 5, 2025

Also, I think the File opener trait is somewhat specialized / not used all that widely, so this change will likely not affect that many people

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants