Open
Description
Is your feature request related to a problem or challenge?
- The
AggregateExec
generates one single (giant)RecordBatch
on output (source) - Which is then emitted in parts (via
RecordBatch::slice()
, which does not actually allocate any additional memory) (source)
This has at least two potential downsides:
- No memory is freed until the GroupByHash has output every output row
- As we see in Further refine the Top K sort operator #9417, if there are upstream operators like TopK that hold references to any of these sliced
RecordBatch
s, those slices are treated as though they were an additional allocation that needs to be tracked (source)
Something like this in pictures:
Output
▲ RecordBatches are
│ slices into a
┌────────────────┐ single large
│ RecordBatch │─ ─ ─ ┐ output batch
└────────────────┴ ─ ─ ┐
▲ │
│ │ ┌────────────────┐
┌────────────────┐ └ ─ ─ ─▶│ │
output stream │ RecordBatch │ │ ├ ─ ─ ─ ─ ─ ─ ─ ─│
└────────────────┘ ─ ─ ─ ─▶│ │
├ ─ ─ ─ ─ ─ ─ ─ ─│
... │ │
│ │
▲ │ ... │
│ │ │
┌────────────────┐ │ │
│ RecordBatch ├ ─ ─ ┐ │ │
└────────────────┘ │ │
▲ │ ├ ─ ─ ─ ─ ─ ─ ─ ─│
│ ─ ─ ─ ─▶│ │
│ └────────────────┘
│
┏━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃ Single RecordBatch
┃ ┃
┃ ┃
┃ ┃
┃ ┃
┃ GroupByHashExec ┃
┃ ┃
┃ ┃
┃ ┃
┃ ┃
┃ ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━┛
Describe the solution you'd like
If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous RecordBatch
Instead it would be better if GroupByHash produced a Vec<RecordBatch>
and then incrementally fed those batches out
Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in #8658
Perhaps something like
▲
│
┌────────────────┐ Output
output stream │ RecordBatch │ RecordBatches are
└────────────────┘ created in smaller
▲ chunks and emitted
│ one by one
│
│
┏━━━━━━━━━━━━━━━━━━━━━━━┓ ┌────────────────┐
┃ ┃ │ RecordBatch │
┃ ┃ └────────────────┘
┃ ┃ ┌────────────────┐
┃ ┃ │ RecordBatch │
┃ ┃ └────────────────┘
┃ GroupByHashExec ┃ ┌────────────────┐
┃ ┃ │ RecordBatch │
┃ ┃ └────────────────┘
┃ ┃ ...
┃ ┃ ┌────────────────┐
┃ ┃ │ RecordBatch │
┃ ┃ └────────────────┘
┗━━━━━━━━━━━━━━━━━━━━━━━┛
Vec<RecordBatch>
Describe alternatives you've considered
No response
Additional context
@yjshen notes:
To improve AggExec's mono output pattern, #7065 might be similar to the idea of incremental output.