Skip to content

Commit d2ce852

Browse files
author
Jiayu Liu
committed
add streams
1 parent 0a861a7 commit d2ce852

File tree

4 files changed

+98
-39
lines changed

4 files changed

+98
-39
lines changed

datafusion/src/physical_plan/aggregates.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result<Da
105105

106106
/// Create a physical (function) expression.
107107
/// This function errors when `args`' can't be coerced to a valid argument type of the function.
108-
pub fn create_aggregate_expr(
108+
pub(super) fn create_aggregate_expr(
109109
fun: &AggregateFunction,
110110
distinct: bool,
111111
args: &[Arc<dyn PhysicalExpr>],

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ impl GroupedHashAggregateStream {
712712
tx.send(result)
713713
});
714714

715-
GroupedHashAggregateStream {
715+
Self {
716716
schema,
717717
output: rx,
718718
finished: false,
@@ -825,6 +825,7 @@ fn aggregate_expressions(
825825
}
826826

827827
pin_project! {
828+
/// stream struct for hash aggregation
828829
pub struct HashAggregateStream {
829830
schema: SchemaRef,
830831
#[pin]

datafusion/src/physical_plan/sort.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ fn sort_batches(
227227
}
228228

229229
pin_project! {
230+
/// stream for sort plan
230231
struct SortStream {
231232
#[pin]
232233
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,

datafusion/src/physical_plan/windows.rs

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,24 @@
1919
2020
use crate::error::{DataFusionError, Result};
2121
use crate::physical_plan::{
22-
aggregates, window_functions::WindowFunction, AggregateExpr, Distribution,
23-
ExecutionPlan, Partitioning, PhysicalExpr, SendableRecordBatchStream, WindowExpr,
22+
aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr,
23+
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
24+
SendableRecordBatchStream, WindowExpr,
25+
};
26+
use arrow::{
27+
array::{Array, UInt32Builder},
28+
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
29+
error::{ArrowError, Result as ArrowResult},
30+
record_batch::RecordBatch,
2431
};
25-
use arrow::datatypes::{Field, Schema, SchemaRef};
2632
use async_trait::async_trait;
33+
use futures::stream::Stream;
34+
use futures::stream::StreamExt;
35+
use pin_project_lite::pin_project;
2736
use std::any::Any;
37+
use std::pin::Pin;
2838
use std::sync::Arc;
39+
use std::task::{Context, Poll};
2940

3041
/// Window execution plan
3142
#[derive(Debug)]
@@ -195,43 +206,89 @@ impl ExecutionPlan for WindowAggExec {
195206

196207
let input = self.input.execute(partition).await?;
197208

198-
199-
200-
201209
Err(DataFusionError::NotImplemented(
202210
"WindowAggExec::execute".to_owned(),
203211
))
204212
}
205213
}
206214

207-
// struct WindowAggStream {
208-
// scheme: SchemaRef,
209-
// window_expr: Vec<Arc<dyn WindowExpr>>,
210-
// input: SendableRecordBatchStream,
211-
// }
212-
213-
// impl Stream for WindowAggStream {
214-
// type Item = ArrowResult<RecordBatch>;
215-
216-
// fn poll_next(
217-
// mut self: Pin<&mut Self>,
218-
// cx: &mut Context<'_>,
219-
// ) -> Poll<Option<Self::Item>> {
220-
// self.input.poll_next_unpin(cx).map(|x| match x {
221-
// Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
222-
// other => other,
223-
// })
224-
// }
225-
226-
// fn size_hint(&self) -> (usize, Option<usize>) {
227-
// // same number of record batches
228-
// self.input.size_hint()
229-
// }
230-
// }
231-
232-
// impl RecordBatchStream for WindowAggStream {
233-
// /// Get the schema
234-
// fn schema(&self) -> SchemaRef {
235-
// self.schema.clone()
236-
// }
237-
// }
215+
pin_project! {
216+
/// stream for window aggregation plan
217+
pub struct WindowAggStream {
218+
#[pin]
219+
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
220+
finished: bool,
221+
schema: SchemaRef,
222+
}
223+
}
224+
225+
async fn compute_window_aggregate(
226+
schema: SchemaRef,
227+
window_expr: Vec<Arc<dyn WindowExpr>>,
228+
mut input: SendableRecordBatchStream,
229+
) -> ArrowResult<Option<RecordBatch>> {
230+
unimplemented!("not implemented")
231+
}
232+
233+
impl WindowAggStream {
234+
fn new(
235+
schema: SchemaRef,
236+
input: SendableRecordBatchStream,
237+
window_expr: Vec<Arc<dyn WindowExpr>>,
238+
) -> Self {
239+
let (tx, rx) = futures::channel::oneshot::channel();
240+
let schema_clone = schema.clone();
241+
tokio::spawn(async move {
242+
let result = compute_window_aggregate(schema_clone, window_expr, input).await;
243+
tx.send(result)
244+
});
245+
246+
Self {
247+
output: rx,
248+
finished: false,
249+
schema,
250+
}
251+
}
252+
}
253+
254+
impl Stream for WindowAggStream {
255+
type Item = ArrowResult<RecordBatch>;
256+
257+
fn poll_next(
258+
mut self: Pin<&mut Self>,
259+
cx: &mut Context<'_>,
260+
) -> Poll<Option<Self::Item>> {
261+
if self.finished {
262+
return Poll::Ready(None);
263+
}
264+
265+
// is the output ready?
266+
let this = self.project();
267+
let output_poll = this.output.poll(cx);
268+
269+
match output_poll {
270+
Poll::Ready(result) => {
271+
*this.finished = true;
272+
// check for error in receiving channel and unwrap actual result
273+
let result = match result {
274+
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
275+
Ok(result) => result,
276+
};
277+
Poll::Ready(Some(result))
278+
}
279+
Poll::Pending => Poll::Pending,
280+
}
281+
}
282+
283+
fn size_hint(&self) -> (usize, Option<usize>) {
284+
// same number of record batches
285+
self.input.size_hint()
286+
}
287+
}
288+
289+
impl RecordBatchStream for WindowAggStream {
290+
/// Get the schema
291+
fn schema(&self) -> SchemaRef {
292+
self.schema.clone()
293+
}
294+
}

0 commit comments

Comments
 (0)