Skip to content

Commit d81c961

Browse files
Projection Pushdown over StreamingTableExec (#8299)
* Projection above streaming table can be removed * Review --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 2071259 commit d81c961

File tree

2 files changed

+199
-16
lines changed

2 files changed

+199
-16
lines changed

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 172 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
//! projections one by one if the operator below is amenable to this. If a
2121
//! projection reaches a source, it can even dissappear from the plan entirely.
2222
23+
use std::sync::Arc;
24+
2325
use super::output_requirements::OutputRequirementExec;
2426
use super::PhysicalOptimizerRule;
2527
use crate::datasource::physical_plan::CsvExec;
@@ -39,18 +41,17 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3941
use crate::physical_plan::{Distribution, ExecutionPlan};
4042

4143
use arrow_schema::SchemaRef;
42-
4344
use datafusion_common::config::ConfigOptions;
4445
use datafusion_common::tree_node::{Transformed, TreeNode};
4546
use datafusion_common::JoinSide;
4647
use datafusion_physical_expr::expressions::Column;
4748
use datafusion_physical_expr::{
4849
Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
4950
};
51+
use datafusion_physical_plan::streaming::StreamingTableExec;
5052
use datafusion_physical_plan::union::UnionExec;
5153

5254
use itertools::Itertools;
53-
use std::sync::Arc;
5455

5556
/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to
5657
/// remove or swap with its child.
@@ -135,6 +136,8 @@ pub fn remove_unnecessary_projections(
135136
try_swapping_with_sort_merge_join(projection, sm_join)?
136137
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
137138
try_swapping_with_sym_hash_join(projection, sym_join)?
139+
} else if let Some(ste) = input.downcast_ref::<StreamingTableExec>() {
140+
try_swapping_with_streaming_table(projection, ste)?
138141
} else {
139142
// If the input plan of the projection is not one of the above, we
140143
// conservatively assume that pushing the projection down may hurt.
@@ -149,8 +152,8 @@ pub fn remove_unnecessary_projections(
149152
Ok(maybe_modified.map_or(Transformed::No(plan), Transformed::Yes))
150153
}
151154

152-
/// Tries to swap `projection` with its input (`csv`). If possible, performs
153-
/// the swap and returns [`CsvExec`] as the top plan. Otherwise, returns `None`.
155+
/// Tries to embed `projection` to its input (`csv`). If possible, returns
156+
/// [`CsvExec`] as the top plan. Otherwise, returns `None`.
154157
fn try_swapping_with_csv(
155158
projection: &ProjectionExec,
156159
csv: &CsvExec,
@@ -174,8 +177,8 @@ fn try_swapping_with_csv(
174177
})
175178
}
176179

177-
/// Tries to swap `projection` with its input (`memory`). If possible, performs
178-
/// the swap and returns [`MemoryExec`] as the top plan. Otherwise, returns `None`.
180+
/// Tries to embed `projection` to its input (`memory`). If possible, returns
181+
/// [`MemoryExec`] as the top plan. Otherwise, returns `None`.
179182
fn try_swapping_with_memory(
180183
projection: &ProjectionExec,
181184
memory: &MemoryExec,
@@ -197,10 +200,52 @@ fn try_swapping_with_memory(
197200
.transpose()
198201
}
199202

203+
/// Tries to embed `projection` to its input (`streaming table`).
204+
/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
205+
/// returns `None`.
206+
fn try_swapping_with_streaming_table(
207+
projection: &ProjectionExec,
208+
streaming_table: &StreamingTableExec,
209+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
210+
if !all_alias_free_columns(projection.expr()) {
211+
return Ok(None);
212+
}
213+
214+
let streaming_table_projections = streaming_table
215+
.projection()
216+
.as_ref()
217+
.map(|i| i.as_ref().to_vec());
218+
let new_projections =
219+
new_projections_for_columns(projection, &streaming_table_projections);
220+
221+
let mut lex_orderings = vec![];
222+
for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
223+
let mut orderings = vec![];
224+
for order in lex_ordering {
225+
let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)?
226+
else {
227+
return Ok(None);
228+
};
229+
orderings.push(PhysicalSortExpr {
230+
expr: new_ordering,
231+
options: order.options,
232+
});
233+
}
234+
lex_orderings.push(orderings);
235+
}
236+
237+
StreamingTableExec::try_new(
238+
streaming_table.partition_schema().clone(),
239+
streaming_table.partitions().clone(),
240+
Some(&new_projections),
241+
lex_orderings,
242+
streaming_table.is_infinite(),
243+
)
244+
.map(|e| Some(Arc::new(e) as _))
245+
}
246+
200247
/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
201-
/// Two consecutive projections can always merge into a single projection unless
202-
/// the [`update_expr`] function does not support one of the expression
203-
/// types involved in the projection.
248+
/// Two consecutive projections can always merge into a single projection.
204249
fn try_unifying_projections(
205250
projection: &ProjectionExec,
206251
child: &ProjectionExec,
@@ -779,10 +824,6 @@ fn new_projections_for_columns(
779824
/// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with
780825
/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
781826
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
782-
///
783-
/// If the expression contains a `PhysicalExpr` variant that this function does
784-
/// not support, it will return `None`. An error can only be introduced if
785-
/// `CaseExpr::try_new` returns an error.
786827
fn update_expr(
787828
expr: &Arc<dyn PhysicalExpr>,
788829
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
@@ -1102,10 +1143,11 @@ mod tests {
11021143
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
11031144
use crate::physical_plan::ExecutionPlan;
11041145

1105-
use arrow_schema::{DataType, Field, Schema, SortOptions};
1146+
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
11061147
use datafusion_common::config::ConfigOptions;
11071148
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
11081149
use datafusion_execution::object_store::ObjectStoreUrl;
1150+
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
11091151
use datafusion_expr::{ColumnarValue, Operator};
11101152
use datafusion_physical_expr::expressions::{
11111153
BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
@@ -1115,8 +1157,11 @@ mod tests {
11151157
PhysicalSortRequirement, ScalarFunctionExpr,
11161158
};
11171159
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
1160+
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
11181161
use datafusion_physical_plan::union::UnionExec;
11191162

1163+
use itertools::Itertools;
1164+
11201165
#[test]
11211166
fn test_update_matching_exprs() -> Result<()> {
11221167
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
@@ -1575,6 +1620,119 @@ mod tests {
15751620
Ok(())
15761621
}
15771622

1623+
#[test]
1624+
fn test_streaming_table_after_projection() -> Result<()> {
1625+
struct DummyStreamPartition {
1626+
schema: SchemaRef,
1627+
}
1628+
impl PartitionStream for DummyStreamPartition {
1629+
fn schema(&self) -> &SchemaRef {
1630+
&self.schema
1631+
}
1632+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1633+
unreachable!()
1634+
}
1635+
}
1636+
1637+
let streaming_table = StreamingTableExec::try_new(
1638+
Arc::new(Schema::new(vec![
1639+
Field::new("a", DataType::Int32, true),
1640+
Field::new("b", DataType::Int32, true),
1641+
Field::new("c", DataType::Int32, true),
1642+
Field::new("d", DataType::Int32, true),
1643+
Field::new("e", DataType::Int32, true),
1644+
])),
1645+
vec![Arc::new(DummyStreamPartition {
1646+
schema: Arc::new(Schema::new(vec![
1647+
Field::new("a", DataType::Int32, true),
1648+
Field::new("b", DataType::Int32, true),
1649+
Field::new("c", DataType::Int32, true),
1650+
Field::new("d", DataType::Int32, true),
1651+
Field::new("e", DataType::Int32, true),
1652+
])),
1653+
}) as _],
1654+
Some(&vec![0_usize, 2, 4, 3]),
1655+
vec![
1656+
vec![
1657+
PhysicalSortExpr {
1658+
expr: Arc::new(Column::new("e", 2)),
1659+
options: SortOptions::default(),
1660+
},
1661+
PhysicalSortExpr {
1662+
expr: Arc::new(Column::new("a", 0)),
1663+
options: SortOptions::default(),
1664+
},
1665+
],
1666+
vec![PhysicalSortExpr {
1667+
expr: Arc::new(Column::new("d", 3)),
1668+
options: SortOptions::default(),
1669+
}],
1670+
]
1671+
.into_iter(),
1672+
true,
1673+
)?;
1674+
let projection = Arc::new(ProjectionExec::try_new(
1675+
vec![
1676+
(Arc::new(Column::new("d", 3)), "d".to_string()),
1677+
(Arc::new(Column::new("e", 2)), "e".to_string()),
1678+
(Arc::new(Column::new("a", 0)), "a".to_string()),
1679+
],
1680+
Arc::new(streaming_table) as _,
1681+
)?) as _;
1682+
1683+
let after_optimize =
1684+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1685+
1686+
let result = after_optimize
1687+
.as_any()
1688+
.downcast_ref::<StreamingTableExec>()
1689+
.unwrap();
1690+
assert_eq!(
1691+
result.partition_schema(),
1692+
&Arc::new(Schema::new(vec![
1693+
Field::new("a", DataType::Int32, true),
1694+
Field::new("b", DataType::Int32, true),
1695+
Field::new("c", DataType::Int32, true),
1696+
Field::new("d", DataType::Int32, true),
1697+
Field::new("e", DataType::Int32, true),
1698+
]))
1699+
);
1700+
assert_eq!(
1701+
result.projection().clone().unwrap().to_vec(),
1702+
vec![3_usize, 4, 0]
1703+
);
1704+
assert_eq!(
1705+
result.projected_schema(),
1706+
&Schema::new(vec![
1707+
Field::new("d", DataType::Int32, true),
1708+
Field::new("e", DataType::Int32, true),
1709+
Field::new("a", DataType::Int32, true),
1710+
])
1711+
);
1712+
assert_eq!(
1713+
result.projected_output_ordering().into_iter().collect_vec(),
1714+
vec![
1715+
vec![
1716+
PhysicalSortExpr {
1717+
expr: Arc::new(Column::new("e", 1)),
1718+
options: SortOptions::default(),
1719+
},
1720+
PhysicalSortExpr {
1721+
expr: Arc::new(Column::new("a", 2)),
1722+
options: SortOptions::default(),
1723+
},
1724+
],
1725+
vec![PhysicalSortExpr {
1726+
expr: Arc::new(Column::new("d", 0)),
1727+
options: SortOptions::default(),
1728+
}],
1729+
]
1730+
);
1731+
assert!(result.is_infinite());
1732+
1733+
Ok(())
1734+
}
1735+
15781736
#[test]
15791737
fn test_projection_after_projection() -> Result<()> {
15801738
let csv = create_simple_csv_exec();

datafusion/physical-plan/src/streaming.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
2626
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
2727

2828
use arrow::datatypes::SchemaRef;
29+
use arrow_schema::Schema;
2930
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
3031
use datafusion_execution::TaskContext;
3132
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
@@ -70,9 +71,9 @@ impl StreamingTableExec {
7071
) -> Result<Self> {
7172
for x in partitions.iter() {
7273
let partition_schema = x.schema();
73-
if !schema.contains(partition_schema) {
74+
if !schema.eq(partition_schema) {
7475
debug!(
75-
"target schema does not contain partition schema. \
76+
"Target schema does not match with partition schema. \
7677
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
7778
);
7879
return plan_err!("Mismatch between schema and batches");
@@ -92,6 +93,30 @@ impl StreamingTableExec {
9293
infinite,
9394
})
9495
}
96+
97+
pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
98+
&self.partitions
99+
}
100+
101+
pub fn partition_schema(&self) -> &SchemaRef {
102+
self.partitions[0].schema()
103+
}
104+
105+
pub fn projection(&self) -> &Option<Arc<[usize]>> {
106+
&self.projection
107+
}
108+
109+
pub fn projected_schema(&self) -> &Schema {
110+
&self.projected_schema
111+
}
112+
113+
pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
114+
self.projected_output_ordering.clone()
115+
}
116+
117+
pub fn is_infinite(&self) -> bool {
118+
self.infinite
119+
}
95120
}
96121

97122
impl std::fmt::Debug for StreamingTableExec {

0 commit comments

Comments
 (0)