Skip to content

fix: Incorrect memory accounting in array_agg function #16519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sfluor
Copy link
Contributor

@sfluor sfluor commented Jun 23, 2025

Which issue does this PR close?

See this issue: #16517

What changes are included in this PR?

Fixes the over-accounting in array_agg functions

Are these changes tested?

Added a test that shows the problem.

Are there any user-facing changes?

Cloning the data might lead to slightly higher "real" memory usage.

@github-actions github-actions bot added the functions Changes to functions implementation label Jun 23, 2025
@sfluor sfluor force-pushed the sami/fix-overaccounting-of-memory-in-array-agg branch from d4a69ce to 1da3e04 Compare June 23, 2025 16:17
@sfluor sfluor marked this pull request as ready for review June 24, 2025 08:01
Comment on lines +344 to 349
// The ArrayRef might be holding a reference to its original input buffer, so
// storing it here directly copied/compacted avoids over accounting memory
// not used here.
self.values
.push(make_array(copy_array_data(&values.to_data())));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm not sure if this will solve the issue. Keep in mind that the merge_batch method argument receives the states of other accumulators, which already hold "compacted" data, so I'd expect this compaction here to be unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a special case to copy_array_data to avoid copying the data when it already is only a single row / has no offset 🤔

Right now it seems to copy the data unconditionally which is a non trivial overhead on each row 🤔

pub fn copy_array_data(src_data: &ArrayData) -> ArrayData {
let mut copy = MutableArrayData::new(vec![&src_data], true, src_data.len());
copy.extend(0, 0, src_data.len());
copy.freeze()

Perhaps we can do that as a follow of PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something along those lines ? (potentially handling all the primitive types and not just int64)

pub fn copy_if_small(inp: &dyn Array) -> Option<ArrayRef> {
    let ratio = 20;
    match inp.as_any().downcast_ref::<Int64Array>() {
        Some(iarr) => {
            // Only copy the array if it's really smaller than the backing buffer.
            let capacity = iarr.values().inner().capacity();
            let length = iarr.values().inner().len();

            if ratio * length < capacity {
                Some(make_array(copy_array_data(&inp.to_data())))
            } else {
                None
            }
        }
        None => {
            // Default to copy if we cannot infer the type.
            Some(make_array(copy_array_data(&inp.to_data())))
        }
    }
}

The problem with this is that we will probably have to do it for each of the internal arrow array types (primitives + strings + ??).

I tried to find a way to access the size of the topmost parent array regardless of the array type but couldn't find one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe to clarify, I did try this approach and the benchmarks shows on-par performance but I'm wondering if maintenability wise it will be alright given the multitude of arrow types we would need to support (I only did it for int64array since this is what the benchmarks are using):

Gnuplot not found, using plotters backend
array_agg i64 merge_batch no nulls
                        time:   [59.457 ns 60.325 ns 61.260 ns]
                        change: [+7.3648% +8.6679% +10.200%] (p = 0.00 < 0.05)
                        Performance has regressed.

Benchmarking array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0000 s (135
array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array
                        time:   [36.705 ns 37.023 ns 37.408 ns]
                        change: [+2.9532% +3.9617% +5.0337%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe

Benchmarking array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0000 s (135M
array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array
                        time:   [36.260 ns 36.470 ns 36.725 ns]
                        change: [+1.7526% +2.6531% +3.4398%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0134 s (1.4
array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array
                        time:   [3.6204 µs 3.6433 µs 3.6793 µs]
                        change: [+0.4207% +0.9048% +1.4450%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 23 outliers among 100 measurements (23.00%)
  2 (2.00%) low severe
  4 (4.00%) low mild
  8 (8.00%) high mild
  9 (9.00%) high severe

Benchmarking array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0176 s (1.4
array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array
                        time:   [3.5311 µs 3.5430 µs 3.5560 µs]
                        change: [-3.9977% -2.2613% -0.8169%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
  6 (6.00%) high mild

Benchmarking array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0411 s (1800
array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array
                        time:   [2.8977 ms 2.9160 ms 2.9351 ms]
                        change: [-4.2235% -1.5538% +0.6217%] (p = 0.25 > 0.05)
                        No change in performance detected.

Benchmarking array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0400 s (5050
array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array
                        time:   [1.2365 ms 1.2409 ms 1.2454 ms]
                        change: [-0.2990% +0.4915% +1.1750%] (p = 0.20 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

Benchmarking array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0041 s (1800
array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array
                        time:   [2.8680 ms 2.8800 ms 2.8922 ms]
                        change: [-1.2448% -0.5287% +0.1313%] (p = 0.14 > 0.05)
                        No change in performance detected.

Benchmarking array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0455 s (5050
array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array
                        time:   [1.2301 ms 1.2346 ms 1.2399 ms]
                        change: [-15.122% -9.6579% -5.7337%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe

Benchmarking array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array: Collecting 100 samples in estimated 5.1633 s (1800
array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array
                        time:   [2.8908 ms 2.9089 ms 2.9333 ms]
                        change: [-15.091% -10.045% -5.6585%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

Benchmarking array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.0s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array: Collecting 100 samples in estimated 6.0271 s (5050
array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array
                        time:   [1.2194 ms 1.2227 ms 1.2266 ms]
                        change: [-10.366% -7.3667% -4.7084%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe

Benchmarking array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array: Collecting 100 samples in estimated 5.0895 s (1800
array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array
                        time:   [2.9292 ms 2.9877 ms 3.0750 ms]
                        change: [+0.6420% +2.9278% +6.0458%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) high mild
  3 (3.00%) high severe

Benchmarking array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.3s, enable flat sampling, or reduce sample count to 60.
Benchmarking array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array: Collecting 100 samples in estimated 6.3183 s (5050
array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array
                        time:   [1.2424 ms 1.2479 ms 1.2540 ms]
                        change: [-8.0349% -5.5341% -3.2153%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) high mild
  5 (5.00%) high severe

Comment on lines +1022 to +1025
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?;

acc1 = merge(acc1, acc2)?;
Copy link
Contributor

@gabotechs gabotechs Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merge_batch functions do not receive arbitrary data, it receives the results of calling state() in other accumulators. A fairer test would be to do something like:

Suggested change
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?;
acc1 = merge(acc1, acc2)?;
acc1.update_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.update_batch(&[Arc::new(a2.slice(0, 1))])?;
acc1 = merge(acc1, acc2)?;

With this, you would notice that the test result is the same regardless of the changes in this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabotechs
Copy link
Contributor

EDIT: I'm seeing that there's cases where the merge_batch method is used for something other than merging states:

let res = match mode {
AggregateMode::Partial
| AggregateMode::Single
| AggregateMode::SinglePartitioned => accum.update_batch(&values),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
accum.merge_batch(&values)
}
};

Which means that probably we do want compaction to happen also in the merge_batch function. I think this is good to go then 👍

Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for a review cc @alamb

@alamb
Copy link
Contributor

alamb commented Jun 25, 2025

Thanks for the code and review @gabotechs and @sfluor

alamb
alamb previously approved these changes Jun 25, 2025
@alamb
Copy link
Contributor

alamb commented Jun 25, 2025

There appears to be an array_agg benchmark -- I will run that on this PR to see what it shows

@alamb

This comment was marked as outdated.

1 similar comment
@alamb

This comment was marked as outdated.

@alamb

This comment was marked as outdated.

@alamb
Copy link
Contributor

alamb commented Jun 25, 2025

🤖 ./gh_compare_branch_bench.sh Benchmark Script Running
Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing sami/fix-overaccounting-of-memory-in-array-agg (1da3e04) to b6c8cc5 diff
BENCH_NAME=array_agg
BENCH_COMMAND=cargo bench --bench array_agg
BENCH_FILTER=
BENCH_BRANCH_NAME=sami_fix-overaccounting-of-memory-in-array-agg
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jun 25, 2025

🤖: Benchmark completed

Details

group                                                                              main                                   sami_fix-overaccounting-of-memory-in-array-agg
-----                                                                              ----                                   ----------------------------------------------
array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array      1.00    547.4±7.12µs        ? ?/sec    9.08      5.0±0.03ms        ? ?/sec
array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array    1.00      6.1±0.03µs        ? ?/sec    7.26     44.5±0.85µs        ? ?/sec
array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array     1.00    547.7±1.36µs        ? ?/sec    9.28      5.1±0.02ms        ? ?/sec
array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array     1.00    548.2±0.87µs        ? ?/sec    9.38      5.1±0.02ms        ? ?/sec
array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array     1.00    561.4±4.24µs        ? ?/sec    9.12      5.1±0.06ms        ? ?/sec
array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array      1.00    243.2±1.64µs        ? ?/sec    8.13  1977.2±10.35µs        ? ?/sec
array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array    1.00      5.9±0.02µs        ? ?/sec    3.96     23.2±0.43µs        ? ?/sec
array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array     1.00    242.1±0.26µs        ? ?/sec    8.39      2.0±0.02ms        ? ?/sec
array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array     1.00    243.5±0.44µs        ? ?/sec    8.08  1968.8±15.78µs        ? ?/sec
array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array     1.00    243.0±0.62µs        ? ?/sec    8.26      2.0±0.01ms        ? ?/sec
array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array    1.00     86.9±0.11ns        ? ?/sec    1.01     87.6±0.14ns        ? ?/sec
array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array     1.00     86.9±0.09ns        ? ?/sec    1.01     87.6±0.15ns        ? ?/sec
array_agg i64 merge_batch no nulls                                                 1.00    100.8±0.12ns        ? ?/sec    544.14    54.8±3.04µs        ? ?/sec

@alamb alamb dismissed their stale review June 26, 2025 17:57

Bnchmarks show significant regression

@alamb
Copy link
Contributor

alamb commented Jun 26, 2025

🤔 the benchmarks show a significant regression in performance (10x in some cases)

I think we need to resolve that prior to merging this in

We have some documentation on how to profile here: https://datafusion.apache.org/library-user-guide/profiling.html

The benchmarks can be run locally like this

cargo bench --bench array_agg

I think it might be worth explroing: #16519 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions Changes to functions implementation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect memory accounting in array_agg function
3 participants