Skip to content

Commit 0303913

Browse files
committed
Merge remote-tracking branch 'apache/master' into file-partitioning
2 parents cb0789e + 57765cd commit 0303913

File tree

18 files changed

+571
-95
lines changed

18 files changed

+571
-95
lines changed

.github/workflows/python_build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ jobs:
106106
- run: cat LICENSE.txt
107107
- name: Build wheels
108108
run: |
109+
export RUSTFLAGS='-C target-cpu=skylake'
109110
docker run --rm -v $(pwd)/..:/io \
110111
--workdir /io/python \
111112
konstin2/maturin:v0.11.2 \

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ members = [
2929
]
3030

3131
exclude = ["python"]
32+
33+
[profile.release]
34+
lto = true
35+
codegen-units = 1

datafusion/src/execution/context.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -893,15 +893,6 @@ impl ExecutionConfig {
893893
Default::default()
894894
}
895895

896-
/// Deprecated. Use with_target_partitions instead.
897-
#[deprecated(
898-
since = "5.1.0",
899-
note = "This method is deprecated in favor of `with_target_partitions`."
900-
)]
901-
pub fn with_concurrency(self, n: usize) -> Self {
902-
self.with_target_partitions(n)
903-
}
904-
905896
/// Customize target_partitions
906897
pub fn with_target_partitions(mut self, n: usize) -> Self {
907898
// partition count must be greater than zero

datafusion/src/field_util.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Utility functions for complex field access
19+
20+
use arrow::datatypes::{DataType, Field};
21+
22+
use crate::error::{DataFusionError, Result};
23+
use crate::scalar::ScalarValue;
24+
25+
/// Returns the field access indexed by `key` from a [`DataType::List`]
26+
/// # Error
27+
/// Errors if
28+
/// * the `data_type` is not a Struct or,
29+
/// * there is no field key is not of the required index type
30+
pub fn get_indexed_field(data_type: &DataType, key: &ScalarValue) -> Result<Field> {
31+
match (data_type, key) {
32+
(DataType::List(lt), ScalarValue::Int64(Some(i))) => {
33+
if *i < 0 {
34+
Err(DataFusionError::Plan(format!(
35+
"List based indexed access requires a positive int, was {0}",
36+
i
37+
)))
38+
} else {
39+
Ok(Field::new(&i.to_string(), lt.data_type().clone(), false))
40+
}
41+
}
42+
(DataType::List(_), _) => Err(DataFusionError::Plan(
43+
"Only ints are valid as an indexed field in a list".to_string(),
44+
)),
45+
_ => Err(DataFusionError::Plan(
46+
"The expression to get an indexed field is only valid for `List` types"
47+
.to_string(),
48+
)),
49+
}
50+
}

datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ pub mod variable;
231231
pub use arrow;
232232
pub use parquet;
233233

234+
pub(crate) mod field_util;
234235
#[cfg(test)]
235236
pub mod test;
236237
pub mod test_util;

datafusion/src/logical_plan/expr.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
2121
pub use super::Operator;
2222
use crate::error::{DataFusionError, Result};
23+
use crate::field_util::get_indexed_field;
2324
use crate::logical_plan::{window_frames, DFField, DFSchema, LogicalPlan};
2425
use crate::physical_plan::functions::Volatility;
2526
use crate::physical_plan::{
@@ -245,6 +246,13 @@ pub enum Expr {
245246
IsNull(Box<Expr>),
246247
/// arithmetic negation of an expression, the operand must be of a signed numeric data type
247248
Negative(Box<Expr>),
249+
/// Returns the field of a [`ListArray`] by key
250+
GetIndexedField {
251+
/// the expression to take the field from
252+
expr: Box<Expr>,
253+
/// The name of the field to take
254+
key: ScalarValue,
255+
},
248256
/// Whether an expression is between a given range.
249257
Between {
250258
/// The value to compare
@@ -433,6 +441,11 @@ impl Expr {
433441
Expr::Wildcard => Err(DataFusionError::Internal(
434442
"Wildcard expressions are not valid in a logical query plan".to_owned(),
435443
)),
444+
Expr::GetIndexedField { ref expr, key } => {
445+
let data_type = expr.get_type(schema)?;
446+
447+
get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
448+
}
436449
}
437450
}
438451

@@ -488,6 +501,10 @@ impl Expr {
488501
Expr::Wildcard => Err(DataFusionError::Internal(
489502
"Wildcard expressions are not valid in a logical query plan".to_owned(),
490503
)),
504+
Expr::GetIndexedField { ref expr, key } => {
505+
let data_type = expr.get_type(input_schema)?;
506+
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
507+
}
491508
}
492509
}
493510

@@ -763,6 +780,7 @@ impl Expr {
763780
.try_fold(visitor, |visitor, arg| arg.accept(visitor))
764781
}
765782
Expr::Wildcard => Ok(visitor),
783+
Expr::GetIndexedField { ref expr, .. } => expr.accept(visitor),
766784
}?;
767785

768786
visitor.post_visit(self)
@@ -923,6 +941,10 @@ impl Expr {
923941
negated,
924942
},
925943
Expr::Wildcard => Expr::Wildcard,
944+
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
945+
expr: rewrite_boxed(expr, rewriter)?,
946+
key,
947+
},
926948
};
927949

928950
// now rewrite this expression itself
@@ -1799,6 +1821,9 @@ impl fmt::Debug for Expr {
17991821
}
18001822
}
18011823
Expr::Wildcard => write!(f, "*"),
1824+
Expr::GetIndexedField { ref expr, key } => {
1825+
write!(f, "({:?})[{}]", expr, key)
1826+
}
18021827
}
18031828
}
18041829
}
@@ -1879,6 +1904,10 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
18791904
let expr = create_name(expr, input_schema)?;
18801905
Ok(format!("{} IS NOT NULL", expr))
18811906
}
1907+
Expr::GetIndexedField { expr, key } => {
1908+
let expr = create_name(expr, input_schema)?;
1909+
Ok(format!("{}[{}]", expr, key))
1910+
}
18821911
Expr::ScalarFunction { fun, args, .. } => {
18831912
create_function_name(&fun.to_string(), false, args, input_schema)
18841913
}

datafusion/src/optimizer/common_subexpr_eliminate.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,10 @@ impl ExprIdentifierVisitor<'_> {
442442
Expr::Wildcard => {
443443
desc.push_str("Wildcard-");
444444
}
445+
Expr::GetIndexedField { key, .. } => {
446+
desc.push_str("GetIndexedField-");
447+
desc.push_str(&key.to_string());
448+
}
445449
}
446450

447451
desc

datafusion/src/optimizer/constant_folding.rs

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717

1818
//! Constant folding and algebraic simplification
1919
20-
use std::sync::Arc;
21-
2220
use arrow::datatypes::DataType;
2321

2422
use crate::error::Result;
2523
use crate::execution::context::ExecutionProps;
2624
use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan, Operator};
2725
use crate::optimizer::optimizer::OptimizerRule;
2826
use crate::optimizer::utils;
29-
use crate::physical_plan::functions::BuiltinScalarFunction;
3027
use crate::scalar::ScalarValue;
3128

3229
/// Simplifies plans by rewriting [`Expr`]`s evaluating constants
@@ -61,18 +58,14 @@ impl OptimizerRule for ConstantFolding {
6158
// children plans.
6259
let mut simplifier = Simplifier {
6360
schemas: plan.all_schemas(),
64-
execution_props,
6561
};
6662

67-
let mut const_evaluator = utils::ConstEvaluator::new();
63+
let mut const_evaluator = utils::ConstEvaluator::new(execution_props);
6864

6965
match plan {
70-
LogicalPlan::Filter { predicate, input } => Ok(LogicalPlan::Filter {
71-
predicate: predicate.clone().rewrite(&mut simplifier)?,
72-
input: Arc::new(self.optimize(input, execution_props)?),
73-
}),
74-
// Rest: recurse into plan, apply optimization where possible
75-
LogicalPlan::Projection { .. }
66+
// Recurse into plan, apply optimization where possible
67+
LogicalPlan::Filter { .. }
68+
| LogicalPlan::Projection { .. }
7669
| LogicalPlan::Window { .. }
7770
| LogicalPlan::Aggregate { .. }
7871
| LogicalPlan::Repartition { .. }
@@ -130,7 +123,6 @@ impl OptimizerRule for ConstantFolding {
130123
struct Simplifier<'a> {
131124
/// input schemas
132125
schemas: Vec<&'a DFSchemaRef>,
133-
execution_props: &'a ExecutionProps,
134126
}
135127

136128
impl<'a> Simplifier<'a> {
@@ -228,15 +220,6 @@ impl<'a> ExprRewriter for Simplifier<'a> {
228220
Expr::Not(inner)
229221
}
230222
}
231-
// convert now() --> the time in `ExecutionProps`
232-
Expr::ScalarFunction {
233-
fun: BuiltinScalarFunction::Now,
234-
..
235-
} => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
236-
self.execution_props
237-
.query_execution_start_time
238-
.timestamp_nanos(),
239-
))),
240223
expr => {
241224
// no additional rewrites possible
242225
expr
@@ -248,10 +231,13 @@ impl<'a> ExprRewriter for Simplifier<'a> {
248231

249232
#[cfg(test)]
250233
mod tests {
234+
use std::sync::Arc;
235+
251236
use super::*;
252237
use crate::{
253238
assert_contains,
254239
logical_plan::{col, lit, max, min, DFField, DFSchema, LogicalPlanBuilder},
240+
physical_plan::functions::BuiltinScalarFunction,
255241
};
256242

257243
use arrow::datatypes::*;
@@ -282,7 +268,6 @@ mod tests {
282268
let schema = expr_test_schema();
283269
let mut rewriter = Simplifier {
284270
schemas: vec![&schema],
285-
execution_props: &ExecutionProps::new(),
286271
};
287272

288273
assert_eq!(
@@ -298,7 +283,6 @@ mod tests {
298283
let schema = expr_test_schema();
299284
let mut rewriter = Simplifier {
300285
schemas: vec![&schema],
301-
execution_props: &ExecutionProps::new(),
302286
};
303287

304288
// x = null is always null
@@ -334,7 +318,6 @@ mod tests {
334318
let schema = expr_test_schema();
335319
let mut rewriter = Simplifier {
336320
schemas: vec![&schema],
337-
execution_props: &ExecutionProps::new(),
338321
};
339322

340323
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -365,7 +348,6 @@ mod tests {
365348
let schema = expr_test_schema();
366349
let mut rewriter = Simplifier {
367350
schemas: vec![&schema],
368-
execution_props: &ExecutionProps::new(),
369351
};
370352

371353
// When one of the operand is not of boolean type, folding the other boolean constant will
@@ -405,7 +387,6 @@ mod tests {
405387
let schema = expr_test_schema();
406388
let mut rewriter = Simplifier {
407389
schemas: vec![&schema],
408-
execution_props: &ExecutionProps::new(),
409390
};
410391

411392
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -441,7 +422,6 @@ mod tests {
441422
let schema = expr_test_schema();
442423
let mut rewriter = Simplifier {
443424
schemas: vec![&schema],
444-
execution_props: &ExecutionProps::new(),
445425
};
446426

447427
// when one of the operand is not of boolean type, folding the other boolean constant will
@@ -477,7 +457,6 @@ mod tests {
477457
let schema = expr_test_schema();
478458
let mut rewriter = Simplifier {
479459
schemas: vec![&schema],
480-
execution_props: &ExecutionProps::new(),
481460
};
482461

483462
assert_eq!(
@@ -753,27 +732,6 @@ mod tests {
753732
}
754733
}
755734

756-
#[test]
757-
fn single_now_expr() {
758-
let table_scan = test_table_scan().unwrap();
759-
let proj = vec![now_expr()];
760-
let time = Utc::now();
761-
let plan = LogicalPlanBuilder::from(table_scan)
762-
.project(proj)
763-
.unwrap()
764-
.build()
765-
.unwrap();
766-
767-
let expected = format!(
768-
"Projection: TimestampNanosecond({})\
769-
\n TableScan: test projection=None",
770-
time.timestamp_nanos()
771-
);
772-
let actual = get_optimized_plan_formatted(&plan, &time);
773-
774-
assert_eq!(expected, actual);
775-
}
776-
777735
#[test]
778736
fn multiple_now_expr() {
779737
let table_scan = test_table_scan().unwrap();
@@ -838,17 +796,16 @@ mod tests {
838796
// now() < cast(to_timestamp(...) as int) + 5000000000
839797
let plan = LogicalPlanBuilder::from(table_scan)
840798
.filter(
841-
now_expr()
799+
cast_to_int64_expr(now_expr())
842800
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000)),
843801
)
844802
.unwrap()
845803
.build()
846804
.unwrap();
847805

848-
// Note that constant folder should be able to run again and fold
849-
// this whole expression down to a single constant;
850-
// https://github.com/apache/arrow-datafusion/issues/1160
851-
let expected = "Filter: TimestampNanosecond(1599566400000000000) < CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Int64) + Int32(50000)\
806+
// Note that constant folder runs and folds the entire
807+
// expression down to a single constant (true)
808+
let expected = "Filter: Boolean(true)\
852809
\n TableScan: test projection=None";
853810
let actual = get_optimized_plan_formatted(&plan, &time);
854811

0 commit comments

Comments
 (0)