Skip to content

Commit c0c65d1

Browse files
committed
physical sort crate
1 parent 407adc0 commit c0c65d1

File tree

11 files changed

+381
-192
lines changed

11 files changed

+381
-192
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ members = [
2020
"datafusion",
2121
"datafusion-common",
2222
"datafusion-expr",
23+
"datafusion-physical-expr",
2324
"datafusion-cli",
2425
"datafusion-examples",
2526
"benchmarks",
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-physical-expr"
20+
description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
21+
version = "7.0.0"
22+
homepage = "https://github.com/apache/arrow-datafusion"
23+
repository = "https://github.com/apache/arrow-datafusion"
24+
readme = "../README.md"
25+
authors = ["Apache Arrow <dev@arrow.apache.org>"]
26+
license = "Apache-2.0"
27+
keywords = [ "arrow", "query", "sql" ]
28+
edition = "2021"
29+
rust-version = "1.58"
30+
31+
[lib]
32+
name = "datafusion_physical_expr"
33+
path = "src/lib.rs"
34+
35+
[features]
36+
37+
[dependencies]
38+
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
39+
datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
40+
arrow = { version = "9.0.0", features = ["prettyprint"] }

datafusion-physical-expr/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Physical Expr
21+
22+
This is an internal module for fundamental physical expression types of [DataFusion][df].
23+
24+
[df]: https://crates.io/crates/datafusion
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalExpr;
19+
20+
use arrow::datatypes::Field;
21+
use datafusion_common::Result;
22+
use datafusion_expr::Accumulator;
23+
use std::fmt::Debug;
24+
25+
use std::any::Any;
26+
use std::sync::Arc;
27+
28+
/// An aggregate expression that:
29+
/// * knows its resulting field
30+
/// * knows how to create its accumulator
31+
/// * knows its accumulator's state's field
32+
/// * knows the expressions from whose its accumulator will receive values
33+
pub trait AggregateExpr: Send + Sync + Debug {
34+
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
35+
/// downcast to a specific implementation.
36+
fn as_any(&self) -> &dyn Any;
37+
38+
/// the field of the final result of this aggregation.
39+
fn field(&self) -> Result<Field>;
40+
41+
/// the accumulator used to accumulate values from the expressions.
42+
/// the accumulator expects the same number of arguments as `expressions` and must
43+
/// return states with the same description as `state_fields`
44+
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
45+
46+
/// the fields that encapsulate the Accumulator's state
47+
/// the number of fields here equals the number of states that the accumulator contains
48+
fn state_fields(&self) -> Result<Vec<Field>>;
49+
50+
/// expressions that are passed to the Accumulator.
51+
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
52+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
53+
54+
/// Human readable name such as `"MIN(c2)"`. The default
55+
/// implementation returns placeholder text.
56+
fn name(&self) -> &str {
57+
"AggregateExpr: default name"
58+
}
59+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod aggregate_expr;
19+
mod physical_expr;
20+
mod sort_expr;
21+
mod window_expr;
22+
23+
pub use aggregate_expr::AggregateExpr;
24+
pub use physical_expr::PhysicalExpr;
25+
pub use sort_expr::PhysicalSortExpr;
26+
pub use window_expr::WindowExpr;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::datatypes::{DataType, Schema};
19+
20+
use arrow::record_batch::RecordBatch;
21+
22+
use datafusion_common::Result;
23+
24+
use datafusion_expr::ColumnarValue;
25+
use std::fmt::{Debug, Display};
26+
27+
use std::any::Any;
28+
29+
/// Expression that can be evaluated against a RecordBatch
30+
/// A Physical expression knows its type, nullability and how to evaluate itself.
31+
pub trait PhysicalExpr: Send + Sync + Display + Debug {
32+
/// Returns the physical expression as [`Any`](std::any::Any) so that it can be
33+
/// downcast to a specific implementation.
34+
fn as_any(&self) -> &dyn Any;
35+
/// Get the data type of this expression, given the schema of the input
36+
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
37+
/// Determine whether this expression is nullable, given the schema of the input
38+
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
39+
/// Evaluate an expression against a RecordBatch
40+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
41+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalExpr;
19+
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
20+
use arrow::record_batch::RecordBatch;
21+
use datafusion_common::{DataFusionError, Result};
22+
use datafusion_expr::ColumnarValue;
23+
use std::sync::Arc;
24+
25+
/// Represents Sort operation for a column in a RecordBatch
26+
#[derive(Clone, Debug)]
27+
pub struct PhysicalSortExpr {
28+
/// Physical expression representing the column to sort
29+
pub expr: Arc<dyn PhysicalExpr>,
30+
/// Option to specify how the given column should be sorted
31+
pub options: SortOptions,
32+
}
33+
34+
impl std::fmt::Display for PhysicalSortExpr {
35+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
36+
let opts_string = match (self.options.descending, self.options.nulls_first) {
37+
(true, true) => "DESC",
38+
(true, false) => "DESC NULLS LAST",
39+
(false, true) => "ASC",
40+
(false, false) => "ASC NULLS LAST",
41+
};
42+
43+
write!(f, "{} {}", self.expr, opts_string)
44+
}
45+
}
46+
47+
impl PhysicalSortExpr {
48+
/// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
49+
pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
50+
let value_to_sort = self.expr.evaluate(batch)?;
51+
let array_to_sort = match value_to_sort {
52+
ColumnarValue::Array(array) => array,
53+
ColumnarValue::Scalar(scalar) => {
54+
return Err(DataFusionError::Plan(format!(
55+
"Sort operation is not applicable to scalar value {}",
56+
scalar
57+
)));
58+
}
59+
};
60+
Ok(SortColumn {
61+
values: array_to_sort,
62+
options: Some(self.options),
63+
})
64+
}
65+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::{PhysicalExpr, PhysicalSortExpr};
19+
use arrow::compute::kernels::partition::lexicographical_partition_ranges;
20+
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
21+
use arrow::record_batch::RecordBatch;
22+
use arrow::{array::ArrayRef, datatypes::Field};
23+
use datafusion_common::{DataFusionError, Result};
24+
use std::any::Any;
25+
use std::fmt::Debug;
26+
use std::ops::Range;
27+
use std::sync::Arc;
28+
29+
/// A window expression that:
30+
/// * knows its resulting field
31+
pub trait WindowExpr: Send + Sync + Debug {
32+
/// Returns the window expression as [`Any`](std::any::Any) so that it can be
33+
/// downcast to a specific implementation.
34+
fn as_any(&self) -> &dyn Any;
35+
36+
/// the field of the final result of this window function.
37+
fn field(&self) -> Result<Field>;
38+
39+
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
40+
/// implementation returns placeholder text.
41+
fn name(&self) -> &str {
42+
"WindowExpr: default name"
43+
}
44+
45+
/// expressions that are passed to the WindowAccumulator.
46+
/// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
47+
/// others (e.g. `cov`) return many.
48+
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
49+
50+
/// evaluate the window function arguments against the batch and return
51+
/// array ref, normally the resulting vec is a single element one.
52+
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
53+
self.expressions()
54+
.iter()
55+
.map(|e| e.evaluate(batch))
56+
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
57+
.collect()
58+
}
59+
60+
/// evaluate the window function values against the batch
61+
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
62+
63+
/// evaluate the partition points given the sort columns; if the sort columns are
64+
/// empty then the result will be a single element vec of the whole column rows.
65+
fn evaluate_partition_points(
66+
&self,
67+
num_rows: usize,
68+
partition_columns: &[SortColumn],
69+
) -> Result<Vec<Range<usize>>> {
70+
if partition_columns.is_empty() {
71+
Ok(vec![Range {
72+
start: 0,
73+
end: num_rows,
74+
}])
75+
} else {
76+
Ok(lexicographical_partition_ranges(partition_columns)
77+
.map_err(DataFusionError::ArrowError)?
78+
.collect::<Vec<_>>())
79+
}
80+
}
81+
82+
/// expressions that's from the window function's partition by clause, empty if absent
83+
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
84+
85+
/// expressions that's from the window function's order by clause, empty if absent
86+
fn order_by(&self) -> &[PhysicalSortExpr];
87+
88+
/// get partition columns that can be used for partitioning, empty if absent
89+
fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
90+
self.partition_by()
91+
.iter()
92+
.map(|expr| {
93+
PhysicalSortExpr {
94+
expr: expr.clone(),
95+
options: SortOptions::default(),
96+
}
97+
.evaluate_to_sort_column(batch)
98+
})
99+
.collect()
100+
}
101+
102+
/// get sort columns that can be used for peer evaluation, empty if absent
103+
fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
104+
let mut sort_columns = self.partition_columns(batch)?;
105+
let order_by_columns = self
106+
.order_by()
107+
.iter()
108+
.map(|e| e.evaluate_to_sort_column(batch))
109+
.collect::<Result<Vec<SortColumn>>>()?;
110+
sort_columns.extend(order_by_columns);
111+
Ok(sort_columns)
112+
}
113+
}

datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ row = []
5454
[dependencies]
5555
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
5656
datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
57+
datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" }
5758
ahash = { version = "0.7", default-features = false }
5859
hashbrown = { version = "0.12", features = ["raw"] }
5960
arrow = { version = "9.0.0", features = ["prettyprint"] }

0 commit comments

Comments
 (0)