feat: Support sliding window queries for MedianAccumulator by implementing retract_batch#19278
Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
This is great, thank you! Left some suggestions.
This PR adds a simple working solution, and it’s quite interesting to figure out how to retract efficiently for large windows 🤔
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |
There was a problem hiding this comment.
I recommend to test different window frames like UNBOUNDED PRECEDING/FOLLOWING
There was a problem hiding this comment.
I wasn't familiar with these before, but this was a great idea! It helped me find and understand a bug.
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
For UNBOUNDED FOLLOWING, an error is raised when retract_batch() isn't implemented. I found that queries with UNBOUNDED PRECEDING do not trigger this and instead return incorrect results. I assume this is a bug, right? If so, I can file a ticket.
For example, if you remove the UNBOUNDED FOLLOWING case right below my comment here, and try the query on main, I get this diff instead of an error.
Results Diff
``` [Diff] (-expected|+actual) 1 tag1 10 10 30 - 2 tag1 20 15 30 - 3 tag1 30 20 30 - 4 tag1 40 25 30 - 5 tag1 50 30 30 + 2 tag1 20 20 30 + 3 tag1 30 30 30 + 4 tag1 40 40 30 + 5 tag1 50 50 30 1 tag2 60 60 80 - 2 tag2 70 65 80 - 3 tag2 80 70 80 - 4 tag2 90 75 80 - 5 tag2 100 80 80 + 2 tag2 70 70 80 + 3 tag2 80 80 80 + 4 tag2 90 90 80 + 5 tag2 100 100 80 ```There was a problem hiding this comment.
There is this quite informative comment which seems to explain why this is the case:
datafusion/datafusion/physical-expr/src/aggregate.rs
Lines 490 to 538 in befaf93
There was a problem hiding this comment.
I think we should file a ticket, the previous impl should be able to handle unbounded preceding as @Jefffrey explained, and the inconsistent results is likely to indicate a bug.
There was a problem hiding this comment.
Ah, so you're saying unbounded preceding is supposed to work even without retract_batch() implemented. I was originally under the impression that it wasn't, but no that makes total sense now.
In that case, I think this PR is already fixes the bug, so there's no need to submit an issue for that. I mentioned in this comment that passing mut instead of clearing state with take() (81ced74) fixes the results in the mod.rs test. I've verified this by copying that change (81ced74) over to main and testing it, and the results for that test change. It's completely unrelated to the new support for retract_batch(). We just have an integer overflow issue remaining, which I've submitted an issue for.
There was a problem hiding this comment.
Yes, this makes sense. I realized that the root cause is already known and it's not possible to cause issue else where.
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |
There was a problem hiding this comment.
I wasn't familiar with these before, but this was a great idea! It helped me find and understand a bug.
| | -85 | -101 | 14 | -12 | -12 | 83 | -101 | 4 | -54 | | ||
| | -85 | -101 | 17 | -25 | -25 | 83 | -101 | 5 | -31 | |
There was a problem hiding this comment.
I found that this test was returning incorrect results due to the bug I explained in another comment, instead of raising an error. The results here were fixed by updating evaluate() to pass a &mut instead of consuming the state with std::mem::take().
retract_batch
Jefffrey
left a comment
There was a problem hiding this comment.
Looks good to me, just one minor question on some of the updated test results
| | -85 | -48 | 6 | -35 | -36 | 83 | -85 | 2 | -43 | | ||
| | -85 | -5 | 4 | -37 | -40 | -5 | -85 | 1 | 83 | | ||
| | -85 | -54 | 15 | -17 | -18 | 83 | -101 | 4 | -38 | | ||
| | -85 | -56 | 2 | -70 | 57 | -56 | -85 | 1 | -25 | |
There was a problem hiding this comment.
I find this interesting, how we have -70 for the approx median but 57 for median 🤔
There was a problem hiding this comment.
Great catch. I looked into it, and it seems like it's wrapping around due to integer overflow while taking the average of the middle two values (since the count is even).
low: [-85], high: -56, median: 57 datatype: Int8
-85 + -56 = -141 -> wraparound to 115
Then 115 / 2 -> 57.5 -> 57 (truncated due to integer type)
What's our desired behavior in this case? We could promote to a larger datatype to perform the calculation. Also is it intentional to return the value as a truncated integer instead of a float?
There was a problem hiding this comment.
Regarding overflow, perhaps we should raise a separate issue to discuss/track this, as it does seem like incorrect behaviour.
We could do similar for the truncated integer behaviour; there was a recent issue asking about this for reference: #18867 (comment)
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
There is this quite informative comment which seems to explain why this is the case:
datafusion/datafusion/physical-expr/src/aggregate.rs
Lines 490 to 538 in befaf93
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
| let values = values[0].as_primitive::<T>(); | ||
| for v in values.iter().flatten() { | ||
| if let Some(idx) = self.all_values.iter().position(|x| *x == v) { |
There was a problem hiding this comment.
It seems this could be very slow?
There was a problem hiding this comment.
Thanks, I improved it using a hashmap in 1b710cc
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
I think we should file a ticket, the previous impl should be able to handle unbounded preceding as @Jefffrey explained, and the inconsistent results is likely to indicate a bug.
| } | ||
|
|
||
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
| let mut to_remove: HashMap<ScalarValue, usize> = HashMap::new(); |
There was a problem hiding this comment.
This seems like a good optimization with minimal added complexity.
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Added tests
Are there any user-facing changes?
Computing the median() window is now supported instead of throwing an error