-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: Incorrect memory accounting in array_agg
function
#16519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: Incorrect memory accounting in array_agg
function
#16519
Conversation
d4a69ce
to
1da3e04
Compare
// The ArrayRef might be holding a reference to its original input buffer, so | ||
// storing it here directly copied/compacted avoids over accounting memory | ||
// not used here. | ||
self.values | ||
.push(make_array(copy_array_data(&values.to_data()))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm not sure if this will solve the issue. Keep in mind that the merge_batch
method argument receives the states of other accumulators, which already hold "compacted" data, so I'd expect this compaction here to be unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should add a special case to copy_array_data to avoid copying the data when it already is only a single row / has no offset 🤔
Right now it seems to copy the data unconditionally which is a non trivial overhead on each row 🤔
datafusion/datafusion/common/src/scalar/mod.rs
Lines 3564 to 3567 in a87d6f2
pub fn copy_array_data(src_data: &ArrayData) -> ArrayData { | |
let mut copy = MutableArrayData::new(vec![&src_data], true, src_data.len()); | |
copy.extend(0, 0, src_data.len()); | |
copy.freeze() |
Perhaps we can do that as a follow of PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something along those lines ? (potentially handling all the primitive types and not just int64)
pub fn copy_if_small(inp: &dyn Array) -> Option<ArrayRef> {
let ratio = 20;
match inp.as_any().downcast_ref::<Int64Array>() {
Some(iarr) => {
// Only copy the array if it's really smaller than the backing buffer.
let capacity = iarr.values().inner().capacity();
let length = iarr.values().inner().len();
if ratio * length < capacity {
Some(make_array(copy_array_data(&inp.to_data())))
} else {
None
}
}
None => {
// Default to copy if we cannot infer the type.
Some(make_array(copy_array_data(&inp.to_data())))
}
}
}
The problem with this is that we will probably have to do it for each of the internal arrow array types (primitives + strings + ??).
I tried to find a way to access the size of the topmost parent array regardless of the array type but couldn't find one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe to clarify, I did try this approach and the benchmarks shows on-par performance but I'm wondering if maintenability wise it will be alright given the multitude of arrow types we would need to support (I only did it for int64array
since this is what the benchmarks are using):
Gnuplot not found, using plotters backend
array_agg i64 merge_batch no nulls
time: [59.457 ns 60.325 ns 61.260 ns]
change: [+7.3648% +8.6679% +10.200%] (p = 0.00 < 0.05)
Performance has regressed.
Benchmarking array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0000 s (135
array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array
time: [36.705 ns 37.023 ns 37.408 ns]
change: [+2.9532% +3.9617% +5.0337%] (p = 0.00 < 0.05)
Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high severe
Benchmarking array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0000 s (135M
array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array
time: [36.260 ns 36.470 ns 36.725 ns]
change: [+1.7526% +2.6531% +3.4398%] (p = 0.00 < 0.05)
Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
Benchmarking array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0134 s (1.4
array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array
time: [3.6204 µs 3.6433 µs 3.6793 µs]
change: [+0.4207% +0.9048% +1.4450%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 23 outliers among 100 measurements (23.00%)
2 (2.00%) low severe
4 (4.00%) low mild
8 (8.00%) high mild
9 (9.00%) high severe
Benchmarking array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0176 s (1.4
array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array
time: [3.5311 µs 3.5430 µs 3.5560 µs]
change: [-3.9977% -2.2613% -0.8169%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
6 (6.00%) high mild
Benchmarking array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0411 s (1800
array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array
time: [2.8977 ms 2.9160 ms 2.9351 ms]
change: [-4.2235% -1.5538% +0.6217%] (p = 0.25 > 0.05)
No change in performance detected.
Benchmarking array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0400 s (5050
array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array
time: [1.2365 ms 1.2409 ms 1.2454 ms]
change: [-0.2990% +0.4915% +1.1750%] (p = 0.20 > 0.05)
No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) high mild
1 (1.00%) high severe
Benchmarking array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0041 s (1800
array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array
time: [2.8680 ms 2.8800 ms 2.8922 ms]
change: [-1.2448% -0.5287% +0.1313%] (p = 0.14 > 0.05)
No change in performance detected.
Benchmarking array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0455 s (5050
array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array
time: [1.2301 ms 1.2346 ms 1.2399 ms]
change: [-15.122% -9.6579% -5.7337%] (p = 0.00 < 0.05)
Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
3 (3.00%) high mild
2 (2.00%) high severe
Benchmarking array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array: Collecting 100 samples in estimated 5.1633 s (1800
array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array
time: [2.8908 ms 2.9089 ms 2.9333 ms]
change: [-15.091% -10.045% -5.6585%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) high mild
1 (1.00%) high severe
Benchmarking array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0271 s (5050
array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array
time: [1.2194 ms 1.2227 ms 1.2266 ms]
change: [-10.366% -7.3667% -4.7084%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
7 (7.00%) high mild
1 (1.00%) high severe
Benchmarking array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0895 s (1800
array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array
time: [2.9292 ms 2.9877 ms 3.0750 ms]
change: [+0.6420% +2.9278% +6.0458%] (p = 0.02 < 0.05)
Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
3 (3.00%) high mild
3 (3.00%) high severe
Benchmarking array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.3s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array: Collecting 100 samples in estimated 6.3183 s (5050
array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array
time: [1.2424 ms 1.2479 ms 1.2540 ms]
change: [-8.0349% -5.5341% -3.2153%] (p = 0.00 < 0.05)
Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
2 (2.00%) high mild
5 (5.00%) high severe
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?; | ||
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?; | ||
|
||
acc1 = merge(acc1, acc2)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merge_batch
functions do not receive arbitrary data, it receives the results of calling state()
in other accumulators. A fairer test would be to do something like:
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?; | |
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?; | |
acc1 = merge(acc1, acc2)?; | |
acc1.update_batch(&[Arc::new(a1.slice(0, 1))])?; | |
acc2.update_batch(&[Arc::new(a2.slice(0, 1))])?; | |
acc1 = merge(acc1, acc2)?; |
With this, you would notice that the test result is the same regardless of the changes in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: I'm seeing that there's cases where the datafusion/datafusion/physical-plan/src/aggregates/no_grouping.rs Lines 232 to 239 in 9278233
Which means that probably we do want compaction to happen also in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good for a review cc @alamb
Thanks for the code and review @gabotechs and @sfluor |
There appears to be an array_agg benchmark -- I will run that on this PR to see what it shows |
This comment was marked as outdated.
This comment was marked as outdated.
1 similar comment
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
🤖 |
🤖: Benchmark completed Details
|
🤔 the benchmarks show a significant regression in performance (10x in some cases) I think we need to resolve that prior to merging this in We have some documentation on how to profile here: https://datafusion.apache.org/library-user-guide/profiling.html The benchmarks can be run locally like this cargo bench --bench array_agg I think it might be worth explroing: #16519 (comment) |
Which issue does this PR close?
See this issue: #16517
array_agg
function #16517.What changes are included in this PR?
Fixes the over-accounting in
array_agg
functionsAre these changes tested?
Added a test that shows the problem.
Are there any user-facing changes?
Cloning the data might lead to slightly higher "real" memory usage.