fix(functions-aggregate): drain CORR state vectors for streaming aggregation#19669
fix(functions-aggregate): drain CORR state vectors for streaming aggregation#19669Jefffrey merged 3 commits intoapache:mainfrom
Conversation
| 2 2 NULL | ||
| 2 3 NULL | ||
| 2 4 NULL | ||
|
|
There was a problem hiding this comment.
It would be good to add a companion EXPLAIN query to verify that it uses the streaming path.
There was a problem hiding this comment.
I had it at first, and removed it as I found it too verbose. Same with a dedicated unit test in correlation.rs, which seemed out of place and only serving as a "demo" of the bug.
Adding just the EXPLAIN for CORR seems too specific to me here. However, I think it would make a lot of sense to actually have a dedicated .slt that runs EXPLAIN and the actual query for all aggregates.
@martin-g WDYT?
EDIT: pushed new comprehensive tests in commit test: add comprehensive aggregate tests for streaming aggregation
There was a problem hiding this comment.
Either way is fine as long as there is a way to assert that it behaves the way it is supposed to be.
Jefffrey
left a comment
There was a problem hiding this comment.
I think just need to resolve conflict then should be good to go @geoffreyclaude
d2bbcb0 to
d2e9888
Compare
|
Thanks @geoffreyclaude & @martin-g |
Which issue does this PR close?
Rationale for this change
This change addresses a failure in the
CORRaggregate function when running in streaming mode. TheCorrelationGroupsAccumulator(introduced in PR #13581) was failing to drain its state vectors duringEmitTo::Firstcalls, causing internal state to persist across emissions. This led to memory leaks, incorrect results for subsequent groups, and "length mismatch" errors because the internal vector sizes diverged from the number of emitted groups.Reproducer
Before:
DataFusion error: Arrow error: Invalid argument error: all columns in a record batch must have the same lengthAfter:
What changes are included in this PR?
This PR is structured into two commits: the first adds a failing test case to demonstrate the issue, and the second implements the fix.
The accumulator now uses
emit_to.take_needed()in bothevaluateandstateto properly consume the emitted portions of the state vectors. Additionally, thesize()implementation has been updated to use vector capacity for more accurate memory accounting.Are these changes tested?
Yes, a new test case in
aggregate.slttriggers streaming aggregation via an ordered subquery. This test previously crashed with an Arrow length mismatch error and now produces correct results.Are there any user-facing changes?
Yes, SQL queries that trigger streaming aggregation using
CORR(typically those with specific ordering requirements) will now succeed instead of failing with a length mismatch error.