Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4a3e553
Initial implementation
neilconway Feb 20, 2026
6b4f5c0
cargo fmt
neilconway Mar 29, 2026
1412ab1
Properly wait for subquery exec to complete before exec'ing main input
neilconway Mar 29, 2026
cedfa5c
Better fix for async exec issue
neilconway Mar 29, 2026
d80569f
Fix doc lint error
neilconway Mar 29, 2026
9f606fb
Implement logical plan serialization/deserialization for subqueries
neilconway Mar 30, 2026
b07491b
cargo fmt
neilconway Mar 30, 2026
27a1ac2
Refactor logical plan deserialization
neilconway Mar 30, 2026
bce0a6d
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 30, 2026
7071001
Increase large files size check
neilconway Mar 30, 2026
b9bce91
fix clippy
neilconway Mar 30, 2026
7c965aa
Update expected TPC-H plans
neilconway Mar 30, 2026
09f167a
Implement statistics
neilconway Mar 30, 2026
54a9f79
Tweak comments
neilconway Mar 30, 2026
b979e3d
Merge branch 'main' into neilc/scalar-subquery-expr
neilconway Mar 30, 2026
2c256e7
Ensure projection pushdown works inside uncorrelated subqueries
neilconway Mar 30, 2026
99d9bcf
Update expected plans
neilconway Mar 30, 2026
9a11d62
Fix overlooked cases for projection pushdown
neilconway Mar 31, 2026
9b217ca
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 31, 2026
5aef67e
Fix line numbers in expected EXPLAIN
neilconway Mar 31, 2026
3d0b99f
Evaluate subqueries in parallel
neilconway Mar 31, 2026
f99ded5
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
b02abf8
Don't try to use subquery filters for partition pruning
neilconway Apr 2, 2026
3971312
Raise an error if duplicate subquery eval is detected
neilconway Apr 2, 2026
64e9f34
cargo fmt
neilconway Apr 2, 2026
26d8acb
Update expected plan
neilconway Apr 2, 2026
d2af491
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
f9c9d5d
Remove unnecessary IN/EXISTS serialization code
neilconway Apr 3, 2026
92e6054
Code cleanup
neilconway Apr 3, 2026
6857966
Code cleanup
neilconway Apr 3, 2026
6a4f524
Code cleanup and refactoring
neilconway Apr 3, 2026
7adb788
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 3, 2026
670139c
Updates for plan API changes
neilconway Apr 3, 2026
1239e3a
Fix doc build
neilconway Apr 3, 2026
7bb6959
Add sanity check on subquery schema, per review
neilconway Apr 12, 2026
4c824a4
Improve comments per review
neilconway Apr 13, 2026
ee58247
Introduce struct wrapping scalar subquery results, per review
neilconway Apr 13, 2026
f582628
Fix reset_state bug
neilconway Apr 13, 2026
4e7442c
Simplify new test case
neilconway Apr 13, 2026
a2087d0
Remove benchmarks, not useful
neilconway Apr 14, 2026
dc4ca31
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 14, 2026
0416b0d
Refactor unit test code
neilconway Apr 14, 2026
b9d307b
Fix doc build
neilconway Apr 14, 2026
e787873
Doc fix
neilconway Apr 14, 2026
ce1e3c0
Tweak comments
neilconway Apr 14, 2026
c32edba
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 14, 2026
50d0ef8
Revert clippy benchmark fix
neilconway Apr 14, 2026
5935028
Update stale comment
neilconway Apr 14, 2026
4b60787
Use IndexSet instead of HashSet + Vec
neilconway Apr 14, 2026
177a190
Reduce unnecessary cloning of subqueries
neilconway Apr 14, 2026
cf582e3
Introduce type wrapper for subquery index
neilconway Apr 14, 2026
2245153
Minor cleanups
neilconway Apr 14, 2026
19f796c
Fix clippy
neilconway Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/large_files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
fetch-depth: 0
- name: Check size of new Git objects
env:
# 1 MB ought to be enough for anybody.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to up the limit? this repo gets checked out a lot

What is so large that required increasing to 2MB?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this because pbjson.rs started to exceed the limit (this PR only increases its size slightly, but it is only a hair under 1MB in mainline).

We could certainly make the limit tighter (e.g., 1.2MB) -- or if there's a different approach you prefer, lmk.

# 2 MB ought to be enough for anybody.
# TODO in case we may want to consciously commit a bigger file to the repo without using Git LFS we may disable the check e.g. with a label
MAX_FILE_SIZE_BYTES: 1048576
MAX_FILE_SIZE_BYTES: 2097152
shell: bash
run: |
if [ "${{ github.event_name }}" = "merge_group" ]; then
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ datafusion-session = { workspace = true }
datafusion-sql = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
Expand Down
357 changes: 295 additions & 62 deletions datafusion/core/src/physical_planner.rs

Large diffs are not rendered by default.

153 changes: 149 additions & 4 deletions datafusion/expr/src/execution_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, Utc};
use datafusion_common::HashMap;
use datafusion_common::ScalarValue;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use std::sync::Arc;
use datafusion_common::{Result, internal_err};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};

/// Holds per-query execution properties and data (such as statement
/// starting timestamps).
Expand All @@ -42,6 +46,12 @@ pub struct ExecutionProps {
pub config_options: Option<Arc<ConfigOptions>>,
/// Providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
/// Maps each logical `Subquery` to its index in `subquery_results`.
/// Populated by the physical planner before calling `create_physical_expr`.
pub subquery_indexes: HashMap<crate::logical_plan::Subquery, SubqueryIndex>,
/// Shared results container for uncorrelated scalar subquery values.
/// Populated at execution time by `ScalarSubqueryExec`.
pub subquery_results: ScalarSubqueryResults,
}

impl Default for ExecutionProps {
Expand All @@ -58,6 +68,8 @@ impl ExecutionProps {
alias_generator: Arc::new(AliasGenerator::new()),
config_options: None,
var_providers: None,
subquery_indexes: HashMap::new(),
subquery_results: ScalarSubqueryResults::default(),
}
}

Expand Down Expand Up @@ -85,8 +97,7 @@ impl ExecutionProps {
&*self
}

/// Registers a variable provider, returning the existing
/// provider, if any
/// Registers a variable provider, returning the existing provider, if any
pub fn add_var_provider(
&mut self,
var_type: VarType,
Expand Down Expand Up @@ -119,15 +130,149 @@ impl ExecutionProps {
}
}

/// Index of a scalar subquery within a [`ScalarSubqueryResults`] container.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SubqueryIndex(usize);

impl SubqueryIndex {
/// Creates a new subquery index.
pub const fn new(index: usize) -> Self {
Self(index)
}

/// Returns the underlying slot index.
pub const fn as_usize(self) -> usize {
self.0
}
}

/// Shared results container for uncorrelated scalar subqueries.
///
/// Each entry corresponds to one scalar subquery, identified by its index.
/// Each slot is populated at execution time by `ScalarSubqueryExec`, read by
/// `ScalarSubqueryExpr` instances that share this container, and cleared when
/// the plan is reset for re-execution.
#[derive(Clone, Default)]
pub struct ScalarSubqueryResults {
slots: Arc<Vec<Mutex<Option<ScalarValue>>>>,
}

impl ScalarSubqueryResults {
/// Creates a new shared results container with `n` empty slots.
pub fn new(n: usize) -> Self {
Self {
slots: Arc::new((0..n).map(|_| Mutex::new(None)).collect()),
}
}

/// Returns the scalar value stored at `index`, if it has been populated.
pub fn get(&self, index: SubqueryIndex) -> Option<ScalarValue> {
let slot = self.slots.get(index.as_usize())?;
slot.lock().unwrap().clone()
}

/// Stores `value` in the slot at `index`.
pub fn set(&self, index: SubqueryIndex, value: ScalarValue) -> Result<()> {
let Some(slot) = self.slots.get(index.as_usize()) else {
return internal_err!(
"ScalarSubqueryResults: result index {} is out of bounds",
index.as_usize()
);
};

let mut slot = slot.lock().unwrap();
if slot.is_some() {
return internal_err!(
"ScalarSubqueryResults: result for index {} was already populated",
index.as_usize()
);
}
*slot = Some(value);

Ok(())
}

/// Clears all populated results so the container can be reused.
pub fn clear(&self) {
for slot in self.slots.iter() {
*slot.lock().unwrap() = None;
}
}

/// Returns true if `this` and `other` point to the same shared container.
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(&this.slots, &other.slots)
}
}

impl fmt::Debug for ScalarSubqueryResults {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list()
.entries(self.slots.iter().map(|slot| slot.lock().unwrap().clone()))
.finish()
}
}

impl PartialEq for ScalarSubqueryResults {
fn eq(&self, other: &Self) -> bool {
Self::ptr_eq(self, other)
}
}

impl Eq for ScalarSubqueryResults {}

impl Hash for ScalarSubqueryResults {
fn hash<H: Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.slots).hash(state);
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn debug() {
let props = ExecutionProps::new();
assert_eq!(
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None }",
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [] }",
format!("{props:?}")
);
}

#[test]
fn scalar_subquery_results_set_and_get() -> Result<()> {
let results = ScalarSubqueryResults::new(1);
assert_eq!(results.get(SubqueryIndex::new(0)), None);

results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
assert_eq!(
results.get(SubqueryIndex::new(0)),
Some(ScalarValue::Int32(Some(42)))
);
assert!(
results
.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))
.is_err()
);

Ok(())
}

#[test]
fn scalar_subquery_results_clear() -> Result<()> {
let results = ScalarSubqueryResults::new(1);
results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;

results.clear();

assert_eq!(results.get(SubqueryIndex::new(0)), None);
results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))?;
assert_eq!(
results.get(SubqueryIndex::new(0)),
Some(ScalarValue::Int32(Some(7)))
);

Ok(())
}
}
6 changes: 6 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,12 @@ impl Expr {
.expect("exists closure is infallible")
}

/// Returns true if the expression contains a scalar subquery.
pub fn contains_scalar_subquery(&self) -> bool {
self.exists(|expr| Ok(matches!(expr, Expr::ScalarSubquery(_))))
.expect("exists closure is infallible")
}

/// Returns true if the expression node is volatile, i.e. whether it can return
/// different results when evaluated multiple times with the same input.
/// Note: unlike [`Self::is_volatile`], this function does not consider inputs:
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3768,6 +3768,7 @@ impl PartialOrd for Aggregate {
/// index among identical entries. For example, if the same set appears three
/// times, the ordinals are 0, 1, 2 and this function returns 2.
/// Returns 0 when no grouping set is duplicated.
#[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Expr contains Arc with interior mutability but is intentionally used as hash key
fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
let mut counts: HashMap<&[Expr], usize> = HashMap::new();
Expand Down
20 changes: 16 additions & 4 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl LogicalPlan {
transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
}

/// Similarly to [`Self::apply`], calls `f` on this node and its inputs
/// Similarly to [`Self::apply`], calls `f` on this node and its inputs,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
Expand All @@ -821,9 +821,7 @@ impl LogicalPlan {
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::SetComparison(SetComparison { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
// use a synthetic plan so the collector sees a
// LogicalPlan::Subquery (even though it is
// actually a Subquery alias)
// Wrap in LogicalPlan::Subquery to match f's signature
f(&LogicalPlan::Subquery(subquery.clone()))
}
_ => Ok(TreeNodeRecursion::Continue),
Expand Down Expand Up @@ -888,4 +886,18 @@ impl LogicalPlan {
})
})
}

/// Similar to [`Self::map_subqueries`], but only applies `f` to
/// uncorrelated subqueries (those with no outer reference columns).
pub fn map_uncorrelated_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
mut f: F,
) -> Result<Transformed<Self>> {
self.map_subqueries(|subquery_plan| match &subquery_plan {
LogicalPlan::Subquery(sq) if sq.outer_ref_columns.is_empty() => {
f(subquery_plan)
}
_ => Ok(Transformed::no(subquery_plan)),
})
}
}
8 changes: 6 additions & 2 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,12 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_) => {
// This rule handles recursion itself in a `ApplyOrder::TopDown` like
// manner.
plan.map_children(|c| self.rewrite(c, config))?
// manner. Process uncorrelated subqueries in expressions
// (e.g., Expr::ScalarSubquery), then direct children.
plan.map_uncorrelated_subqueries(|c| self.rewrite(c, config))?
.transform_sibling(|plan| {
plan.map_children(|c| self.rewrite(c, config))
})?
}
};

Expand Down
7 changes: 6 additions & 1 deletion datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,12 @@ fn rewrite_children(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let transformed_plan = plan.map_children(|input| optimizer.rewrite(input, config))?;
// Process uncorrelated subqueries in expressions, then direct children.
let transformed_plan = plan
.map_uncorrelated_subqueries(|input| optimizer.rewrite(input, config))?
.transform_sibling(|plan| {
plan.map_children(|input| optimizer.rewrite(input, config))
})?;

// recompute schema if the plan was transformed
if transformed_plan.transformed {
Expand Down
33 changes: 27 additions & 6 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ fn optimize_projections(
// their parents' required indices.
match plan {
LogicalPlan::Projection(proj) => {
return merge_consecutive_projections(proj)?.transform_data(|proj| {
rewrite_projection_given_requirements(proj, config, &indices)
});
return merge_consecutive_projections(proj)?
.transform_data(|proj| {
rewrite_projection_given_requirements(proj, config, &indices)
})?
.transform_data(|plan| optimize_subqueries(plan, config));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unfortuante as it seems like we'll have to add this extra traversal for all passes that want to recurse into subqueries.

Hwever, I think that is not introduced by this PR so we can perhaps deal with it in a follow on

}
LogicalPlan::Aggregate(aggregate) => {
// Split parent requirements to GROUP BY and aggregate sections:
Expand Down Expand Up @@ -210,7 +212,8 @@ fn optimize_projections(
new_aggr_expr,
)
.map(LogicalPlan::Aggregate)
});
})?
.transform_data(|plan| optimize_subqueries(plan, config));
}
LogicalPlan::Window(window) => {
let input_schema = Arc::clone(window.input.schema());
Expand Down Expand Up @@ -250,7 +253,8 @@ fn optimize_projections(
.map(LogicalPlan::Window)
.map(Transformed::yes)
}
});
})?
.transform_data(|plan| optimize_subqueries(plan, config));
}
LogicalPlan::TableScan(table_scan) => {
let TableScan {
Expand All @@ -271,7 +275,8 @@ fn optimize_projections(
let new_scan =
TableScan::try_new(table_name, source, Some(projection), filters, fetch)?;

return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)));
return Transformed::yes(LogicalPlan::TableScan(new_scan))
.transform_data(|plan| optimize_subqueries(plan, config));
}
// Other node types are handled below
_ => {}
Expand Down Expand Up @@ -463,6 +468,9 @@ fn optimize_projections(
)
})?;

let transformed_plan =
transformed_plan.transform_data(|plan| optimize_subqueries(plan, config))?;

// If any of the children are transformed, we need to potentially update the plan's schema
if transformed_plan.transformed {
transformed_plan.map_data(|plan| plan.recompute_schema())
Expand All @@ -473,6 +481,19 @@ fn optimize_projections(

/// Merges consecutive projections.
///
/// Optimizes uncorrelated subquery plans embedded in expressions of the given
/// plan node (e.g., `Expr::ScalarSubquery`). `map_children` only visits direct
/// plan inputs, so subqueries must be handled separately.
fn optimize_subqueries(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
plan.map_uncorrelated_subqueries(|subquery_plan| {
let indices = RequiredIndices::new_for_all_exprs(&subquery_plan);
optimize_projections(subquery_plan, config, indices)
})
}

/// Given a projection `proj`, this function attempts to merge it with a previous
/// projection if it exists and if merging is beneficial. Merging is considered
/// beneficial when expressions in the current projection are non-trivial and
Expand Down
Loading
Loading