Skip to content

Commit 20b3042

Browse files
committed
Wrap immutable plan parts into Arc
- Closes #19852 Improve performance of query planning and plan state re-set by making node clone cheap. - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`.
1 parent 6524d91 commit 20b3042

File tree

26 files changed

+307
-140
lines changed

26 files changed

+307
-140
lines changed

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ impl CustomExec {
202202
schema: SchemaRef,
203203
db: CustomDataSource,
204204
) -> Self {
205-
let projected_schema = project_schema(&schema, projections).unwrap();
205+
let projected_schema =
206+
project_schema(&schema, projections.map(|v| v.as_ref())).unwrap();
206207
let cache = Self::compute_properties(projected_schema.clone());
207208
Self {
208209
db,

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl TableProvider for ListingTable {
522522

523523
// if no files need to be read, return an `EmptyExec`
524524
if partitioned_file_lists.is_empty() {
525-
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
525+
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
526526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527527
}
528528

datafusion/common/src/stats.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,8 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
395-
let Some(projection) = projection else {
394+
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
395+
let Some(projection) = projection.map(AsRef::as_ref) else {
396396
return self;
397397
};
398398

@@ -410,7 +410,7 @@ impl Statistics {
410410
.map(Slot::Present)
411411
.collect();
412412

413-
for idx in projection {
413+
for idx in projection.iter() {
414414
let next_idx = self.column_statistics.len();
415415
let slot = std::mem::replace(
416416
columns.get_mut(*idx).expect("projection out of bounds"),
@@ -1066,29 +1066,29 @@ mod tests {
10661066

10671067
#[test]
10681068
fn test_project_none() {
1069-
let projection = None;
1070-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1069+
let projection: Option<&[usize]> = None;
1070+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10711071
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721072
}
10731073

10741074
#[test]
10751075
fn test_project_empty() {
10761076
let projection = Some(vec![]);
1077-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1077+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10781078
assert_eq!(stats, make_stats(vec![]));
10791079
}
10801080

10811081
#[test]
10821082
fn test_project_swap() {
10831083
let projection = Some(vec![2, 1]);
1084-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1084+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10851085
assert_eq!(stats, make_stats(vec![30, 20]));
10861086
}
10871087

10881088
#[test]
10891089
fn test_project_repeated() {
10901090
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1091+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10921092
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
10931093
}
10941094

datafusion/common/src/utils/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use std::thread::available_parallelism;
5959
///
6060
/// // Pick columns 'c' and 'b'
6161
/// let projection = Some(vec![2, 1]);
62-
/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
62+
/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap();
6363
///
6464
/// let expected_schema = SchemaRef::new(Schema::new(vec![
6565
/// Field::new("c", DataType::Utf8, true),
@@ -70,9 +70,9 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&[usize]>,
7474
) -> Result<SchemaRef> {
75-
let schema = match projection {
75+
let schema = match projection.map(AsRef::as_ref) {
7676
Some(columns) => Arc::new(schema.project(columns)?),
7777
None => Arc::clone(schema),
7878
};

datafusion/core/src/datasource/empty.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
7777
_limit: Option<usize>,
7878
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// even though there is no data, projections apply
80-
let projected_schema = project_schema(&self.schema, projection)?;
80+
let projected_schema =
81+
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
8182
Ok(Arc::new(
8283
EmptyExec::new(projected_schema).with_partitions(self.partitions),
8384
))

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ impl DefaultPhysicalPlanner {
984984
// project the output columns excluding the async functions
985985
// The async functions are always appended to the end of the schema.
986986
.apply_projection(Some(
987-
(0..input.schema().fields().len()).collect(),
987+
(0..input.schema().fields().len()).collect::<Vec<_>>(),
988988
))?
989989
.with_batch_size(session_state.config().batch_size())
990990
.build()?

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl CustomExecutionPlan {
8686
fn new(projection: Option<Vec<usize>>) -> Self {
8787
let schema = TEST_CUSTOM_SCHEMA_REF!();
8888
let schema =
89-
project_schema(&schema, projection.as_ref()).expect("projected schema");
89+
project_schema(&schema, projection.as_deref()).expect("projected schema");
9090
let cache = Self::compute_properties(schema);
9191
Self { projection, cache }
9292
}

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
762762
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
763763
);
764764

765-
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
765+
assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]);
766766
assert_eq!(swapped.schema().fields.len(), 1);
767767
assert_eq!(swapped.schema().fields[0].name(), "small_col");
768768
Ok(())

datafusion/datasource/src/memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl MemorySourceConfig {
262262
schema: SchemaRef,
263263
projection: Option<Vec<usize>>,
264264
) -> Result<Self> {
265-
let projected_schema = project_schema(&schema, projection.as_ref())?;
265+
let projected_schema = project_schema(&schema, projection.as_deref())?;
266266
Ok(Self {
267267
partitions: partitions.to_vec(),
268268
schema,

0 commit comments

Comments
 (0)