Skip to content

Intermediate result blocked approach to aggregation memory management #15591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 90 commits into
base: main
Choose a base branch
from

Conversation

Rachelint
Copy link
Contributor

@Rachelint Rachelint commented Apr 5, 2025

Which issue does this PR close?

Rationale for this change

As mentioned in #7065 , we use a single Vec to manage aggregation intermediate results both in GroupAccumulator and GroupValues.

It is simple but not efficient enough in high-cardinality aggregation, because when Vec is not large enough, we need to allocate a new Vec and copy all data from the old one.

  • Copying a large amount of data(due to high-cardinality) is obviously expansive
  • And it is also not friendly to cpu (will refresh cache and tlb)

So this pr introduces a blocked approach to manage the aggregation intermediate results. We will never resize the Vec in the approach, and instead we split the data to blocks, when the capacity is not enough, we just allocate a new block. Detail can see #7065

What changes are included in this PR?

  • Implement the sketch for blocked approach
  • Implement blocked groups supporting PrimitiveGroupsAccumulator and GroupValuesPrimitive as the example

Are these changes tested?

Test by exist tests. And new unit tests, new fuzzy tests.

Are there any user-facing changes?

Two functions are added to GroupValues and GroupAccumulator trait.

But as you can see, there are default implementations for them, and users can choose to really support the blocked approach when wanting a better performance for their udafs.

    /// Returns `true` if this accumulator supports blocked groups.
    fn supports_blocked_groups(&self) -> bool {
        false
    }

    /// Alter the block size in the accumulator
    ///
    /// If the target block size is `None`, it will use a single big
    /// block(can think it a `Vec`) to manage the state.
    ///
    /// If the target block size` is `Some(blk_size)`, it will try to
    /// set the block size to `blk_size`, and the try will only success
    /// when the accumulator has supported blocked mode.
    ///
    /// NOTICE: After altering block size, all data in previous will be cleared.
    ///
    fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
        if block_size.is_some() {
            return Err(DataFusionError::NotImplemented(
                "this accumulator doesn't support blocked mode yet".to_string(),
            ));
        }

        Ok(())
    }

@Rachelint Rachelint changed the title Impl Intermeidate result blocked approach framework Impl intermeidate result blocked approach framework Apr 5, 2025
@Rachelint Rachelint changed the title Impl intermeidate result blocked approach framework Impl intermeidate result blocked approach sketch Apr 5, 2025
@github-actions github-actions bot added the logical-expr Logical plan and expressions label Apr 5, 2025
@Dandandan
Copy link
Contributor

Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement.
I'll share it with you once I have some time to validate the design (probably this evening).

@Rachelint
Copy link
Contributor Author

Rachelint commented Apr 8, 2025

Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement. I'll share it with you once I have some time to validate the design (probably this evening).

Really thanks. This design in pr indeed still introduces quite a few code changes...

I tried to not modify anythings about GroupAccumulator firstly:

  • Only implement the blocked logic in GroupValues
  • Then we reorder the input batch according to their block indices got from GroupValues
  • Apply input batch to related GroupAccumulator using slice
  • And when we found the new block is needed, create a new GroupAccumulator (one block one GroupAccumulator)

But I found this way will introduce too many extra cost...

Maybe we place the block indices into values in merge/update_batch as a Array?

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 2 times, most recently from cc37eba to f690940 Compare April 9, 2025 14:37
@github-actions github-actions bot added the functions Changes to functions implementation label Apr 10, 2025
@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch from 95c6a36 to a4c6f42 Compare April 10, 2025 11:10
@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Apr 10, 2025
@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 6 times, most recently from 2100a5b to 0ee951c Compare April 17, 2025 11:56
@Rachelint
Copy link
Contributor Author

Rachelint commented Apr 17, 2025

Has finished development(and test) of all needed common structs!
Rest four things for this one:

  • Support blocked related logic in GroupedHashAggregateStream(we can copy it from Sketch for aggregation intermediate results blocked management #11943 )
  • Logic about deciding when we should enable this optimization
  • Example blocked version for GroupAccumulator and GroupValues
  • Unit test for blocked GroupValuesPrimitive, it is a bit complex
  • Fuzzy tests
  • Chore: fix docs, fix clippy, add more comments...

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 2 times, most recently from c51d409 to 2863809 Compare April 20, 2025 14:46
@github-actions github-actions bot added execution Related to the execution crate common Related to common crate sqllogictest SQL Logic Tests (.slt) labels Apr 21, 2025
@Rachelint
Copy link
Contributor Author

It is very close, just need to add more tests!

@Rachelint Rachelint force-pushed the intermeidate-result-blocked-approach branch 3 times, most recently from 31d660d to 2b8dd1e Compare April 22, 2025 18:52
@Rachelint Rachelint marked this pull request as draft May 19, 2025 09:54
@Rachelint
Copy link
Contributor Author

@alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again.

@alamb
Copy link
Contributor

alamb commented May 19, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing intermeidate-result-blocked-approach (600318f) to 07fe23f diff
Benchmarks: clickbench_1 tpch_mem
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 19, 2025

@alamb Hello, is it ok to trigger the benchmark again?

Will do

@alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again.

No worries -- I am sorry I have not given it as much attention as I would like. Is there anything else I can do to help (like implement one of the grouped memory examples)? I am about to file a ticket to start organizing the follow on work

@Dandandan
Copy link
Contributor

@alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again.

No worries

@Rachelint
Copy link
Contributor Author

Will do

Thanks!

No worries -- I am sorry I have not given it as much attention as I would like. Is there anything else I can do to help (like implement one of the grouped memory examples)? I am about to file a ticket to start organizing the follow on work

I think still need some time to evaluate what benefits can this brings

After experiment, I found:

But inspired by the batch_size based memory allocation, I am thinking can we have some ways to reuse memory? And I am trying it today.

@alamb
Copy link
Contributor

alamb commented May 19, 2025

🤖: Benchmark completed

Details

Comparing HEAD and intermeidate-result-blocked-approach
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ intermeidate-result-blocked-approach ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.64ms │                               0.61ms │ +1.06x faster │
│ QQuery 1     │    73.32ms │                              77.79ms │  1.06x slower │
│ QQuery 2     │   116.11ms │                             121.93ms │  1.05x slower │
│ QQuery 3     │   127.64ms │                             128.08ms │     no change │
│ QQuery 4     │   650.79ms │                             627.02ms │     no change │
│ QQuery 5     │   862.07ms │                             864.44ms │     no change │
│ QQuery 6     │     0.76ms │                               0.71ms │ +1.07x faster │
│ QQuery 7     │    89.64ms │                              96.33ms │  1.07x slower │
│ QQuery 8     │   945.07ms │                             945.95ms │     no change │
│ QQuery 9     │  1243.07ms │                            1248.13ms │     no change │
│ QQuery 10    │   294.76ms │                             302.60ms │     no change │
│ QQuery 11    │   327.74ms │                             342.77ms │     no change │
│ QQuery 12    │   876.40ms │                             901.09ms │     no change │
│ QQuery 13    │  1301.86ms │                            1359.55ms │     no change │
│ QQuery 14    │   864.23ms │                             850.71ms │     no change │
│ QQuery 15    │   845.06ms │                             852.93ms │     no change │
│ QQuery 16    │  1750.37ms │                            1757.44ms │     no change │
│ QQuery 17    │  1605.74ms │                            1621.73ms │     no change │
│ QQuery 18    │  3082.30ms │                            3123.52ms │     no change │
│ QQuery 19    │   124.11ms │                             124.83ms │     no change │
│ QQuery 20    │  1179.74ms │                            1168.95ms │     no change │
│ QQuery 21    │  1413.11ms │                            1400.95ms │     no change │
│ QQuery 22    │  2537.54ms │                            2451.09ms │     no change │
│ QQuery 23    │  8489.89ms │                            8532.97ms │     no change │
│ QQuery 24    │   511.43ms │                             505.62ms │     no change │
│ QQuery 25    │   427.56ms │                             425.53ms │     no change │
│ QQuery 26    │   577.57ms │                             577.86ms │     no change │
│ QQuery 27    │  1707.08ms │                            1709.85ms │     no change │
│ QQuery 28    │ 12952.06ms │                           13194.62ms │     no change │
│ QQuery 29    │   573.95ms │                             570.25ms │     no change │
│ QQuery 30    │   846.22ms │                             855.72ms │     no change │
│ QQuery 31    │   903.90ms │                             896.13ms │     no change │
│ QQuery 32    │  2682.87ms │                            2680.35ms │     no change │
│ QQuery 33    │  3413.44ms │                            3409.35ms │     no change │
│ QQuery 34    │  3453.88ms │                            3441.08ms │     no change │
│ QQuery 35    │  1287.13ms │                            1278.56ms │     no change │
│ QQuery 36    │   175.07ms │                             178.36ms │     no change │
│ QQuery 37    │   104.82ms │                             103.96ms │     no change │
│ QQuery 38    │   174.73ms │                             172.98ms │     no change │
│ QQuery 39    │   261.50ms │                             258.02ms │     no change │
│ QQuery 40    │    87.21ms │                              89.21ms │     no change │
│ QQuery 41    │    86.33ms │                              85.57ms │     no change │
│ QQuery 42    │    82.95ms │                              79.43ms │     no change │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                                   │ 59111.65ms │
│ Total Time (intermeidate-result-blocked-approach)   │ 59414.57ms │
│ Average Time (HEAD)                                 │  1374.69ms │
│ Average Time (intermeidate-result-blocked-approach) │  1381.73ms │
│ Queries Faster                                      │          2 │
│ Queries Slower                                      │          3 │
│ Queries with No Change                              │         38 │
└─────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ intermeidate-result-blocked-approach ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 119.85ms │                             123.00ms │     no change │
│ QQuery 2     │  22.89ms │                              24.18ms │  1.06x slower │
│ QQuery 3     │  34.42ms │                              34.48ms │     no change │
│ QQuery 4     │  20.45ms │                              20.75ms │     no change │
│ QQuery 5     │  54.12ms │                              55.55ms │     no change │
│ QQuery 6     │  12.15ms │                              12.01ms │     no change │
│ QQuery 7     │ 104.41ms │                             107.42ms │     no change │
│ QQuery 8     │  25.93ms │                              27.42ms │  1.06x slower │
│ QQuery 9     │  61.78ms │                              63.54ms │     no change │
│ QQuery 10    │  58.64ms │                              57.95ms │     no change │
│ QQuery 11    │  13.10ms │                              13.86ms │  1.06x slower │
│ QQuery 12    │  43.69ms │                              46.48ms │  1.06x slower │
│ QQuery 13    │  29.74ms │                              30.87ms │     no change │
│ QQuery 14    │  10.18ms │                              10.02ms │     no change │
│ QQuery 15    │  25.08ms │                              27.64ms │  1.10x slower │
│ QQuery 16    │  23.21ms │                              22.51ms │     no change │
│ QQuery 17    │ 101.75ms │                             103.30ms │     no change │
│ QQuery 18    │ 248.41ms │                             244.84ms │     no change │
│ QQuery 19    │  28.58ms │                              26.58ms │ +1.08x faster │
│ QQuery 20    │  39.04ms │                              39.30ms │     no change │
│ QQuery 21    │ 173.27ms │                             172.60ms │     no change │
│ QQuery 22    │  17.97ms │                              17.79ms │     no change │
└──────────────┴──────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                                   │ 1268.64ms │
│ Total Time (intermeidate-result-blocked-approach)   │ 1282.10ms │
│ Average Time (HEAD)                                 │   57.67ms │
│ Average Time (intermeidate-result-blocked-approach) │   58.28ms │
│ Queries Faster                                      │         1 │
│ Queries Slower                                      │         5 │
│ Queries with No Change                              │        16 │
└─────────────────────────────────────────────────────┴───────────┘

@alamb
Copy link
Contributor

alamb commented May 19, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing intermeidate-result-blocked-approach (600318f) to 07fe23f diff
Benchmarks: clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 19, 2025

🤖: Benchmark completed

Details

Comparing HEAD and intermeidate-result-blocked-approach
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ intermeidate-result-blocked-approach ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │  1916.94ms │                            1866.52ms │     no change │
│ QQuery 1     │   709.88ms │                             673.73ms │ +1.05x faster │
│ QQuery 2     │  1482.93ms │                            1391.57ms │ +1.07x faster │
│ QQuery 3     │   701.88ms │                             721.01ms │     no change │
│ QQuery 4     │  1494.91ms │                            1494.55ms │     no change │
│ QQuery 5     │ 15627.54ms │                           15418.84ms │     no change │
│ QQuery 6     │  2070.08ms │                            2042.39ms │     no change │
│ QQuery 7     │  2108.54ms │                            1952.66ms │ +1.08x faster │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                                   │ 26112.69ms │
│ Total Time (intermeidate-result-blocked-approach)   │ 25561.26ms │
│ Average Time (HEAD)                                 │  3264.09ms │
│ Average Time (intermeidate-result-blocked-approach) │  3195.16ms │
│ Queries Faster                                      │          3 │
│ Queries Slower                                      │          0 │
│ Queries with No Change                              │          5 │
└─────────────────────────────────────────────────────┴────────────┘

@Dandandan
Copy link
Contributor

Dandandan commented May 19, 2025

  • We perform few large memory allocation in flat, but will much more small memory allocations in blocked
  • The memory no continuous anymore, I think it may be not so friendly for cpu(like cache prefetch?)

I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller?

We could optimize a growing strategy for the first allocated Vec if memory usage / overhead of first block is a concern.

@alamb
Copy link
Contributor

alamb commented May 19, 2025

BTW, I am confused about why so many page_fault in the blocked accumualte.

Me too -- I looked at the flamegraph you provided and I agree it seems like almost half the allocation time is spent with pagefaults / zeroing memory. However, I can't tell if that is because there is slowness with the underlying Vec that wasn't initialized or if there is something else going on.

Screenshot 2025-05-19 at 6 34 12 AM

  • But it maybe not really help much for performance (performance improve mainly due to removal of usage of expansive slice) currently.

Yes, that was my understanding -- that blocked aggregation would only help performance when the number of intermediate groups was large (which forced additional memory allocations)

But inspired by the batch_size based memory allocation, I am thinking can we have some ways to reuse memory? And I am trying it today.

I suspect you already know this, but I think you can get back the original Vec from an array via

  1. PrimitiveArray::into_parts() --> get a ScalarBuffer
  2. ScalarBuffer::into_inner() --> get a Buffer
  3. Buffer::into_vec()

However, in the high cardinality case, I am not sure there are buffers to reuse during aggregation (the buffers are all held until the output is needed, and then once output is needed they don't get re-created)

@alamb
Copy link
Contributor

alamb commented May 19, 2025

I think still need some time to evaluate what benefits can this brings

I think I am somewhat lost with the current state. Your comment on #15591 (comment) states

I add a query in extened.sql, the blocked approach can get a obvious improvement as expected.

I have confidence it can even improve more in some other queries according to poc #11943

And the most recent benchmark run #15591 (comment) seems to confirm this finding:

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ HEAD ┃ intermeidate-result-blocked-approach ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 1916.94ms │ 1866.52ms │ no change │
│ QQuery 1 │ 709.88ms │ 673.73ms │ +1.05x faster │
│ QQuery 2 │ 1482.93ms │ 1391.57ms │ +1.07x faster │
│ QQuery 3 │ 701.88ms │ 721.01ms │ no change │
│ QQuery 4 │ 1494.91ms │ 1494.55ms │ no change │
│ QQuery 5 │ 15627.54ms │ 15418.84ms │ no change │
│ QQuery 6 │ 2070.08ms │ 2042.39ms │ no change │
│ QQuery 7 │ 2108.54ms │ 1952.66ms │ +1.08x faster │
└──────────────┴────────────┴──────────────────────────────────────┴───────────────┘

However, some of the results on shorter running queries (few groups) shows perhaps a slowdown: #15591 (comment)

From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups?

};

fn generate_group_indices(len: usize) -> Vec<usize> {
(0..len).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does using indices 0..len mean this benchmark is testing the case where each input has a new unique group?

If this is the case, then I think it would make sense that the benchmark shows lots of time allocating / zeroing memory: the bulk of the work will be copying each value into a new accumulator slot.

@Rachelint
Copy link
Contributor Author

I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller?
We could optimize a growing strategy for the first allocated Vec if memory usage / overhead of first block is a concern.

I have tried to larger the block size(8 * batch, 16 * batch...), but it seems make slight difference to the performance.

So after experiement, I think single vector + resizing is efficient enough actually...

  • It is more efficient for random access
  • Resizing will only happen a few times, so it is acceptable actually

@Rachelint
Copy link
Contributor Author

Rachelint commented May 19, 2025

Me too -- I looked at the flamegraph you provided and I agree it seems like almost half the allocation time is spent with pagefaults / zeroing memory. However, I can't tell if that is because there is slowness with the underlying Vec that wasn't initialized or if there is something else going on.

I think I nearly understand why about this, it is possibly led by lto, lto seems found the initialization is unnecessary actually, so it remove it(just like calling set_len manually).

I suspect you already know this, but I think you can get back the original Vec from an array via

Got it!

From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups?

I think it maybe be possible in this situation?

  • When input for accumualtor and group values is consumed, we collect them, and transform them back to Vec.
  • Then we push them in accumualtor and group values.
  • Finally we reuse them in next round computation of accumualtor and group values?

From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups?

Yes, in current implementation, it only help to performance with large amount of intermediate groups, because the slice will be called many many time and the cost become unacceptable.
And for query with only small groups, it nearly make no difference.

@Dandandan
Copy link
Contributor

Yes, in current implementation, it only help to performance with large amount of intermediate groups, because the slice will be called many many time and the cost become unacceptable.
And for query with only small groups, it nearly make no difference.

Yeah I think that was expected.
I think we should try to minimize the impact of this on low-cardinality cases (e.g. make sure they fit in one array, minimize the overhead of blocks)...

So after experiement, I think single vector + resizing is efficient enough actually...

Yeah it is quite efficient, although problematic for large inputs

  • Offset out of bounds for utf8 / binary data.
  • Overallocation due to exponential allocation strategy
    So even with roughly the same performance I think we should still strive to make the change.

@Rachelint
Copy link
Contributor Author

Rachelint commented May 19, 2025

I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller?
We could optimize a growing strategy for the first allocated Vec if memory usage / overhead of first block is a concern.

I think we should try to minimize the impact of this on low-cardinality cases (e.g. make sure they fit in one array, minimize the overhead of blocks)...

If I don't misunderstand, does it mean strategy like that:

  • We make the block size large enough
  • For the first block, we still perform resizing at firstly
  • But after it grow large enough, we switch to blocked approach?

Yeah it is quite efficient, although problematic for large inputs

Agree. It also leads to large memory usage, because we only release memory after all the batches are returned(we hold the really large single batch in memory, and only return slice of it now, and only release memory at once after all slices are returned).

@Dandandan
Copy link
Contributor

Dandandan commented May 19, 2025

We make the block size large enough
For the first block, we still perform resizing at firstly
But after it grow large enough, we switch to blocked approach?

Yes - exactly!

@alamb
Copy link
Contributor

alamb commented May 25, 2025

I wonder what the plan is for this PR?

From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index)

It seems like the proposal is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers.

@Rachelint
Copy link
Contributor Author

I wonder what the plan is for this PR?

From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index)

It seems like the proposal is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers.

I am experimenting if something we can do basing on this pr to improve performance more, like [memory reuse] (#15591 (comment)). Actually #16135 is part of the attempt.

My biggest concern is if we can get more obvious improvement to make the change worthy...

@alamb
Copy link
Contributor

alamb commented May 26, 2025

My biggest concern is if we can get more obvious improvement to make the change worthy...

As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path

Though of course that would make this PR even bigger

We could also make a "POC" type PR with some more implementation to prove out the performance and then break it into smaller pieces for review 🤔

@Rachelint
Copy link
Contributor Author

Rachelint commented May 27, 2025

My biggest concern is if we can get more obvious improvement to make the change worthy...

As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path

Though of course that would make this PR even bigger

We could also make a "POC" type PR with some more implementation to prove out the performance and then break it into smaller pieces for review 🤔

Yes, I am trying to implement it for count, and see if some improvements in q4 and q15.

But according to the flamegraph, I found all such queries' bottleneck is actually hashtable. I think performance improvement from this pr will be not obvious before we overcome hashtable(I am experimenting about clickhouse like hashtable).

I think the benefit from this one currently is:

  • Part of the better aggregation spilling (I will file a issue to discuss it maybe tonight)
  • Lower memory usage due to faster freeing batch by batch
  • Can make really high cardinality query around 1.1 faster

Do you think it still worthy continuing to push forward for above benefits?
I plan to sort out codes tonight, and we just make a simplest implementation in first stage (now is a bit complex due to some experiment about continue improving performance).

@adriangb
Copy link
Contributor

Part of the better aggregation spilling (I will file a issue to discuss it maybe tonight)

Without having a full understanding of this PR (I have just been following the conversation because the change is exciting) my 2¢ is: for us memory management is currently one of the biggest thorns with DataFusion. It is quite hard at the moment to run with a fixed memory budget given the mix of exceeding memory through under accounting and refusing to run operators that can't spill / try to claim more memory than they actually use.

@Dandandan
Copy link
Contributor

I agree with @adriangb , even when it doesn't provide performance improvements still super valuable to push it forward.

@Rachelint
Copy link
Contributor Author

Thanks @adriangb @Dandandan .

I just start my new job this week and a bit busy, and I will continue to push it forward this weekend.

The new targets for this one may be

  • Mainly as a base of better aggregation spilling
  • Slightly improve performance

So the rest works I think:

  • File an issue about the new aggregation spilling proposal based on this one
  • Support block mode for NullState again

@alamb
Copy link
Contributor

alamb commented May 28, 2025

Thanks for all your help @Rachelint and congratulations on the new job

@alamb alamb changed the title Implement intermediate result blocked approach to aggregation memory management Intermediate result blocked approach to aggregation memory management Jun 2, 2025
@Dandandan
Copy link
Contributor

thanks @Rachelint and congratulations!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation functions Changes to functions implementation logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants