Skip to content

Commit d3cc283

Browse files
committed
[fix] improvements following review
apache#1141 (review)
1 parent c5cfcfb commit d3cc283

File tree

2 files changed

+145
-87
lines changed

2 files changed

+145
-87
lines changed

datafusion/src/datasource/listing/helpers.rs

Lines changed: 78 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use log::debug;
3737
use crate::{
3838
error::Result,
3939
execution::context::ExecutionContext,
40-
logical_plan::{self, Expr},
40+
logical_plan::{self, Expr, ExpressionVisitor, Recursion},
4141
physical_plan::functions::Volatility,
4242
scalar::ScalarValue,
4343
};
@@ -51,93 +51,85 @@ const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
5151
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
5252
const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";
5353

54+
/// The `ExpressionVisitor` for `expr_applicable_for_cols`. Walks the tree to
55+
/// validate that the given expression is applicable with only the `col_names`
56+
/// set of columns.
57+
struct ApplicabilityVisitor<'a> {
58+
col_names: &'a [String],
59+
is_applicable: &'a mut bool,
60+
}
61+
62+
impl ApplicabilityVisitor<'_> {
63+
fn is_volatitlity_applicable(self, volatility: Volatility) -> Recursion<Self> {
64+
match volatility {
65+
Volatility::Immutable => Recursion::Continue(self),
66+
// TODO: Stable functions could be `applicable`, but that would require access to the context
67+
Volatility::Stable | Volatility::Volatile => {
68+
*self.is_applicable = false;
69+
Recursion::Stop(self)
70+
}
71+
}
72+
}
73+
}
74+
75+
impl ExpressionVisitor for ApplicabilityVisitor<'_> {
76+
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
77+
let rec = match expr {
78+
Expr::Column(logical_plan::Column { ref name, .. }) => {
79+
*self.is_applicable &= self.col_names.contains(name);
80+
Recursion::Stop(self) // leaf node anyway
81+
}
82+
Expr::Literal(_)
83+
| Expr::Alias(_, _)
84+
| Expr::ScalarVariable(_)
85+
| Expr::Not(_)
86+
| Expr::IsNotNull(_)
87+
| Expr::IsNull(_)
88+
| Expr::Negative(_)
89+
| Expr::Cast { .. }
90+
| Expr::TryCast { .. }
91+
| Expr::BinaryExpr { .. }
92+
| Expr::Between { .. }
93+
| Expr::InList { .. }
94+
| Expr::Case { .. } => Recursion::Continue(self),
95+
96+
Expr::ScalarFunction { fun, .. } => {
97+
self.is_volatitlity_applicable(fun.volatility())
98+
}
99+
Expr::ScalarUDF { fun, .. } => {
100+
self.is_volatitlity_applicable(fun.signature.volatility)
101+
}
102+
103+
// TODO other expressions are not handled yet:
104+
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
105+
// - Can `Wildcard` be considered as a `Literal`?
106+
// - ScalarVariable could be `applicable`, but that would require access to the context
107+
Expr::AggregateUDF { .. }
108+
| Expr::AggregateFunction { .. }
109+
| Expr::Sort { .. }
110+
| Expr::WindowFunction { .. }
111+
| Expr::Wildcard => {
112+
*self.is_applicable = false;
113+
Recursion::Stop(self)
114+
}
115+
};
116+
Ok(rec)
117+
}
118+
}
119+
54120
/// Check whether the given expression can be resolved using only the columns `col_names`.
55121
/// This means that if this function returns true:
56122
/// - the table provider can filter the table partition values with this expression
57123
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
58124
/// was performed
59125
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
60-
match expr {
61-
// leaf
62-
Expr::Literal(_) => true,
63-
// TODO how to handle qualified / unqualified names?
64-
Expr::Column(logical_plan::Column { ref name, .. }) => col_names.contains(name),
65-
// unary
66-
Expr::Alias(child, _)
67-
| Expr::Not(child)
68-
| Expr::IsNotNull(child)
69-
| Expr::IsNull(child)
70-
| Expr::Negative(child)
71-
| Expr::Cast { expr: child, .. }
72-
| Expr::TryCast { expr: child, .. } => expr_applicable_for_cols(col_names, child),
73-
// binary
74-
Expr::BinaryExpr {
75-
ref left,
76-
ref right,
77-
..
78-
} => {
79-
expr_applicable_for_cols(col_names, left)
80-
&& expr_applicable_for_cols(col_names, right)
81-
}
82-
// ternary
83-
Expr::Between {
84-
expr: item,
85-
low,
86-
high,
87-
..
88-
} => {
89-
expr_applicable_for_cols(col_names, item)
90-
&& expr_applicable_for_cols(col_names, low)
91-
&& expr_applicable_for_cols(col_names, high)
92-
}
93-
// variadic
94-
Expr::ScalarFunction { fun, args } => match fun.volatility() {
95-
Volatility::Immutable => args
96-
.iter()
97-
.all(|arg| expr_applicable_for_cols(col_names, arg)),
98-
// TODO: Stable functions could be `applicable`, but that would require access to the context
99-
Volatility::Stable => false,
100-
Volatility::Volatile => false,
101-
},
102-
Expr::ScalarUDF { fun, args } => match fun.signature.volatility {
103-
Volatility::Immutable => args
104-
.iter()
105-
.all(|arg| expr_applicable_for_cols(col_names, arg)),
106-
// TODO: Stable functions could be `applicable`, but that would require access to the context
107-
Volatility::Stable => false,
108-
Volatility::Volatile => false,
109-
},
110-
Expr::InList {
111-
expr: item, list, ..
112-
} => {
113-
expr_applicable_for_cols(col_names, item)
114-
&& list.iter().all(|e| expr_applicable_for_cols(col_names, e))
115-
}
116-
Expr::Case {
117-
expr,
118-
when_then_expr,
119-
else_expr,
120-
} => {
121-
let expr_constant = expr
122-
.as_ref()
123-
.map(|e| expr_applicable_for_cols(col_names, e))
124-
.unwrap_or(true);
125-
let else_constant = else_expr
126-
.as_ref()
127-
.map(|e| expr_applicable_for_cols(col_names, e))
128-
.unwrap_or(true);
129-
let when_then_constant = when_then_expr.iter().all(|(w, th)| {
130-
expr_applicable_for_cols(col_names, w)
131-
&& expr_applicable_for_cols(col_names, th)
132-
});
133-
expr_constant && else_constant && when_then_constant
134-
}
135-
// TODO other expressions are not handled yet:
136-
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
137-
// - Can `Wildcard` be considered as a `Literal`?
138-
// - ScalarVariable could be `applicable`, but that would require access to the context
139-
_ => false,
140-
}
126+
let mut is_applicable = true;
127+
expr.accept(ApplicabilityVisitor {
128+
col_names,
129+
is_applicable: &mut is_applicable,
130+
})
131+
.unwrap();
132+
is_applicable
141133
}
142134

143135
/// Partition the list of files into `n` groups
@@ -191,8 +183,10 @@ pub async fn pruned_partition_list(
191183
.collect();
192184
let stream_path = table_path.to_owned();
193185
if applicable_filters.is_empty() {
194-
// parse the partition values while listing all the files
195-
// TODO we might avoid parsing the partition values if they are not used in any projection
186+
// Parse the partition values while listing all the files
187+
// Note: We might avoid parsing the partition values if they are not used in any projection,
188+
// but the cost of parsing will likely be far dominated by the time to fetch the listing from
189+
// the object store.
196190
let table_partition_cols_stream = table_partition_cols.to_vec();
197191
Ok(Box::pin(
198192
store

datafusion/src/physical_plan/file_format/mod.rs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,12 @@ impl<'a> Display for FileGroupsDisplay<'a> {
166166
}
167167
}
168168

169-
/// A helper that projects partition columns into the file record batches
169+
/// A helper that projects partition columns into the file record batches.
170+
///
171+
/// One interesting trick is the usage of a cache for the key buffers of the partition column
172+
/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
173+
/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
174+
/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
170175
struct PartitionColumnProjector {
171176
/// An Arrow buffer initialized to zeros that represents the key array of all partition
172177
/// columns (partition columns are materialized by dictionary arrays with only one
@@ -202,7 +207,7 @@ impl PartitionColumnProjector {
202207
}
203208
}
204209

205-
// Transform the batch read from the fileby inserting the partitioning columns
210+
// Transform the batch read from the file by inserting the partitioning columns
206211
// to the right positions as deduced from `projected_schema`
207212
// - file_batch: batch read from the file, with internal projection applied
208213
// - partition_values: the list of partition values, one for each partition column
@@ -379,6 +384,8 @@ mod tests {
379384
let (proj_schema, _) = conf.project();
380385
// created a projector for that projected schema
381386
let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols);
387+
388+
// project first batch
382389
let projected_batch = proj
383390
.project(
384391
// file_batch is ok here because we kept all the file cols in the projection
@@ -390,7 +397,6 @@ mod tests {
390397
],
391398
)
392399
.expect("Projection of partition columns into record batch failed");
393-
394400
let expected = vec![
395401
"+---+----+----+------+-----+",
396402
"| a | b | c | year | day |",
@@ -401,6 +407,64 @@ mod tests {
401407
"+---+----+----+------+-----+",
402408
];
403409
crate::assert_batches_eq!(expected, &[projected_batch]);
410+
411+
// project another batch that is larger than the previous one
412+
let file_batch = build_table_i32(
413+
("a", &vec![5, 6, 7, 8, 9]),
414+
("b", &vec![-10, -9, -8, -7, -6]),
415+
("c", &vec![12, 13, 14, 15, 16]),
416+
);
417+
let projected_batch = proj
418+
.project(
419+
// file_batch is ok here because we kept all the file cols in the projection
420+
file_batch,
421+
&[
422+
ScalarValue::Utf8(Some("2021".to_owned())),
423+
ScalarValue::Utf8(Some("10".to_owned())),
424+
ScalarValue::Utf8(Some("27".to_owned())),
425+
],
426+
)
427+
.expect("Projection of partition columns into record batch failed");
428+
let expected = vec![
429+
"+---+-----+----+------+-----+",
430+
"| a | b | c | year | day |",
431+
"+---+-----+----+------+-----+",
432+
"| 5 | -10 | 12 | 2021 | 27 |",
433+
"| 6 | -9 | 13 | 2021 | 27 |",
434+
"| 7 | -8 | 14 | 2021 | 27 |",
435+
"| 8 | -7 | 15 | 2021 | 27 |",
436+
"| 9 | -6 | 16 | 2021 | 27 |",
437+
"+---+-----+----+------+-----+",
438+
];
439+
crate::assert_batches_eq!(expected, &[projected_batch]);
440+
441+
// project another batch that is smaller than the previous one
442+
let file_batch = build_table_i32(
443+
("a", &vec![0, 1, 3]),
444+
("b", &vec![2, 3, 4]),
445+
("c", &vec![4, 5, 6]),
446+
);
447+
let projected_batch = proj
448+
.project(
449+
// file_batch is ok here because we kept all the file cols in the projection
450+
file_batch,
451+
&[
452+
ScalarValue::Utf8(Some("2021".to_owned())),
453+
ScalarValue::Utf8(Some("10".to_owned())),
454+
ScalarValue::Utf8(Some("28".to_owned())),
455+
],
456+
)
457+
.expect("Projection of partition columns into record batch failed");
458+
let expected = vec![
459+
"+---+---+---+------+-----+",
460+
"| a | b | c | year | day |",
461+
"+---+---+---+------+-----+",
462+
"| 0 | 2 | 4 | 2021 | 28 |",
463+
"| 1 | 3 | 5 | 2021 | 28 |",
464+
"| 3 | 4 | 6 | 2021 | 28 |",
465+
"+---+---+---+------+-----+",
466+
];
467+
crate::assert_batches_eq!(expected, &[projected_batch]);
404468
}
405469

406470
// sets default for configs that play no role in projections

0 commit comments

Comments
 (0)