Skip to content

Commit bf5b8a5

Browse files
author
Jiayu Liu
committed
save
1 parent d2ce852 commit bf5b8a5

File tree

2 files changed

+5
-9
lines changed

2 files changed

+5
-9
lines changed

datafusion/src/physical_plan/aggregates.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result<Da
105105

106106
/// Create a physical (function) expression.
107107
/// This function errors when `args`' can't be coerced to a valid argument type of the function.
108-
pub(super) fn create_aggregate_expr(
108+
pub fn create_aggregate_expr(
109109
fun: &AggregateFunction,
110110
distinct: bool,
111111
args: &[Arc<dyn PhysicalExpr>],

datafusion/src/physical_plan/windows.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::{
3232
use async_trait::async_trait;
3333
use futures::stream::Stream;
3434
use futures::stream::StreamExt;
35+
use futures::Future;
3536
use pin_project_lite::pin_project;
3637
use std::any::Any;
3738
use std::pin::Pin;
@@ -271,19 +272,14 @@ impl Stream for WindowAggStream {
271272
*this.finished = true;
272273
// check for error in receiving channel and unwrap actual result
273274
let result = match result {
274-
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
275-
Ok(result) => result,
275+
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
276+
Ok(result) => result.transpose(),
276277
};
277-
Poll::Ready(Some(result))
278+
Poll::Ready(result)
278279
}
279280
Poll::Pending => Poll::Pending,
280281
}
281282
}
282-
283-
fn size_hint(&self) -> (usize, Option<usize>) {
284-
// same number of record batches
285-
self.input.size_hint()
286-
}
287283
}
288284

289285
impl RecordBatchStream for WindowAggStream {

0 commit comments

Comments
 (0)