perf: Implement physical execution of uncorrelated scalar subqueries#21240
perf: Implement physical execution of uncorrelated scalar subqueries#21240neilconway wants to merge 53 commits intoapache:mainfrom
Conversation
| pub struct DefaultPhysicalProtoConverter; | ||
| #[derive(Default)] | ||
| pub struct DefaultPhysicalProtoConverter { | ||
| scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>, |
There was a problem hiding this comment.
I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.
There was a problem hiding this comment.
This feels like a bit of an anti-pattern. I'm going to need a bit of time to dive into what's going on here, but hopefully will get to it either this afternoon or maybe Sunday evening.
There was a problem hiding this comment.
I put up this PR targeting you branch as an explanation of what I mean.
The problem I have with adding state data to DefaultPhysicalProtoConverter is that now any time we have a custom proto converter that doesn't call the default, we will not be able to process these scalar subquery results.
Instead I think we just have to plumb this data member through the deserialization process. I haven't taken a super deep look into exactly how this ends up getting used to see if there's another way to take advantage. The method I used in the PR was basically to add a struct that contains all of the parts we pass through deserialization and add the scalar_subquery_results to it.
In regards to switching from FunctionRegistry -> TaskContext that's a great change. It was done part way in recent releases for the physical side but not on the logical side. It makes perfect sense to do it the way you have on the logical side.
| // Create the shared results container and register it (along with | ||
| // the index map) in ExecutionProps so that `create_physical_expr` | ||
| // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr` | ||
| // nodes. We clone the SessionState so these are available | ||
| // throughout physical planning without mutating the caller's state. | ||
| // | ||
| // Ideally, the subquery state would live in a dedicated planning | ||
| // context rather than on ExecutionProps (which is meant for | ||
| // session-level configuration). It's here because | ||
| // `create_physical_expr` only receives `&ExecutionProps`, and | ||
| // changing that signature would be a breaking public API change. | ||
| let results: Arc<Vec<OnceLock<ScalarValue>>> = | ||
| Arc::new((0..links.len()).map(|_| OnceLock::new()).collect()); | ||
| let session_state = if links.is_empty() { | ||
| Cow::Borrowed(session_state) | ||
| } else { | ||
| let mut owned = session_state.clone(); | ||
| owned.execution_props_mut().subquery_indexes = index_map; | ||
| owned.execution_props_mut().subquery_results = Arc::clone(&results); | ||
| Cow::Owned(owned) | ||
| }; |
There was a problem hiding this comment.
This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Any chance we can break this PR into smaller pieces (e.g. move benchmarks, for example) to make it easier to review? |
Hmmm, that might be a bit tricky. The benchmarks are pretty trivial and could easily be omitted. Here's how Claude summarizes the PR:
If it is helpful, I could prepare two PRs that have a split like:
If you think that would be easier to review, lmk. |
|
Ok, wil try and review shortly |
|
I started reviewing this PR and will hopefully complete the review shortly |
Thanks @alamb ! Feel free to ping me if you have any questions or want to discuss. |
alamb
left a comment
There was a problem hiding this comment.
I went through this PR carefully and overall I think it looks great. Thank you so much @neilconway -- the implementation makes sense and I think it moves the needle forward for subquery execution
Things I would like to see before I approve this PR:
- Why is the large file size change required
- Fix
reset_state(see inlined comment) as I think that would be a regression - Someone more knowledgeable than me review the changes to the dataufsion-proto traits.
I left a bunch of other comments/questions which I think are not required for this PR to merge but maybe is worth considering
Protobuf changes
I am not sure about the changes to the protobuf serialization / registries / etc (e.g. to take TaskContext rather than FunctionRegistry); I think @timsaucer and @milenkovicm are more clued in than I am in this area
Perhaps you could break those changes (to protobuf serialization traits) into a separate PR so it is easier for them to review / evaluate the scope of the changes
Suggested breakout
Also, breaking out the new .slt tests would help me evaluate the change introduced by this PR (see comments)
| fetch-depth: 0 | ||
| - name: Check size of new Git objects | ||
| env: | ||
| # 1 MB ought to be enough for anybody. |
There was a problem hiding this comment.
do we really need to up the limit? this repo gets checked out a lot
What is so large that required increasing to 2MB?
There was a problem hiding this comment.
I changed this because pbjson.rs started to exceed the limit (this PR only increases its size slightly, but it is only a hair under 1MB in mainline).
We could certainly make the limit tighter (e.g., 1.2MB) -- or if there's a different approach you prefer, lmk.
| pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>, | ||
| /// Maps each logical `Subquery` to its index in `subquery_results`. | ||
| /// Populated by the physical planner before calling `create_physical_expr`. | ||
| pub subquery_indexes: HashMap<crate::logical_plan::Subquery, usize>, |
There was a problem hiding this comment.
Codex points out that two logically equivalent subqueries (aka had the same SQL text) will actually be treated as being different beacuse their spans are different
I think this is ok, and we could potentially detect and optimize away duplcated scalar subqueries as a follow on PR (we would also have to detect volatile (random) functions etc)
There was a problem hiding this comment.
I took a brief look at doing this and it seems doable but not trivial; I filed #21619 to track this, but I think it's out of the scope of this PR.
| bytes: &[u8], | ||
| registry: &dyn FunctionRegistry, | ||
| ) -> Result<Self>; | ||
| fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>; |
There was a problem hiding this comment.
I think this is technically a breaking API change -- maybe we can leave the old method in there and mark it deprecated? Otherwise we should add a note to the upgrade guide
Sorry for the massive review (though I feel somewhat justified b/c the PR was large 😆 ) |
|
@alamb AMAZING!!! Thank you for the thorough review, I really appreciate it. I'll take a look at the comments and respond shortly. |
we should have replaced |
…-expr # Conflicts: # datafusion/sqllogictest/test_files/subquery.slt
Which issue does this PR close?
array_has#18181.Rationale for this change
Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has two shortcomings:
ORDER BYorJOIN ON, or as arguments to an aggregate function. Those cases are now supported.This PR introduces physical execution of uncorrelated scalar subqueries:
ScalarSubqueryExecplan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with aScalarSubqueryExpr.ScalarSubqueryExecmanages the execution of the subqueries. Subquery evaluation is done in parallel (for a given query level), but at present it happens strictly before evaluation of the parent query. This might be improved in the future (Consider overlapping scalar subquery and parent query computation #21591).ScalarSubqueryExprfetches results fromScalarSubqueryExecviaScalarSubqueryResults, which contains anArc<Vector<Mutex<Option<ScalarValue>>>>, with one "slot" in the vector for each subquery. EachExprknows the index of its result slot because it is passed in when theExpris created;create_physical_exprknows which index to pass because the physical planner creates a map ofSubquery->SubqueryIndex(usize), which is stored inExecutionProps.ScalarSubqueryExpris evaluated, it fetches the result of the subquery from the result container.This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.
What changes are included in this PR?
ScalarSubqueryExprPhysicalProtoConverterExtensionto wire upScalarSubqueryExprcorrectlyAre these changes tested?
Yes.
Are there any user-facing changes?
At the SQL-level, scalar subqueries that returned > 1 row will now be rejected instead of producing incorrect query results.
At the API-level, this PR adds several new public APIs (e.g.,
ScalarSubqueryExpr,ScalarSubqueryExec) and makes breaking changes to several public APIs (e.g.,parse_expr). It also introduces a new physical plan node (and allowsSubqueryto remain in logical plans); third-party query optimization code will encounter these nodes when they wouldn't have before.