Skip to content

Generate GroupByHash output in multiple RecordBatches rather than one large one #9562

Open
@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

  1. The AggregateExec generates one single (giant) RecordBatch on output (source)
  2. Which is then emitted in parts (via RecordBatch::slice(), which does not actually allocate any additional memory) (source)

This has at least two potential downsides:

  1. No memory is freed until the GroupByHash has output every output row
  2. As we see in Further refine the Top K sort operator #9417, if there are upstream operators like TopK that hold references to any of these sliced RecordBatchs, those slices are treated as though they were an additional allocation that needs to be tracked (source)

Something like this in pictures:

                                                 Output             
                           ▲               RecordBatches are        
                           │                 slices into a          
                  ┌────────────────┐          single large          
                  │  RecordBatch   │─ ─ ─ ┐   output batch          
                  └────────────────┴ ─ ─ ┐                          
                           ▲              │                         
                           │             │        ┌────────────────┐
                  ┌────────────────┐      └ ─ ─ ─▶│                │
  output stream   │  RecordBatch   │     │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                  └────────────────┘      ─ ─ ─ ─▶│                │
                                                  ├ ─ ─ ─ ─ ─ ─ ─ ─│
                         ...                      │                │
                                                  │                │
                           ▲                      │      ...       │
                           │                      │                │
                  ┌────────────────┐              │                │
                  │  RecordBatch   ├ ─ ─ ┐        │                │
                  └────────────────┘              │                │
                           ▲             │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                           │              ─ ─ ─ ─▶│                │
                           │                      └────────────────┘
                           │                                        
               ┏━━━━━━━━━━━━━━━━━━━━━━━┓                            
               ┃                       ┃       Single RecordBatch   
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃    GroupByHashExec    ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┃                       ┃                            
               ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            

Describe the solution you'd like

If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous RecordBatch

Instead it would be better if GroupByHash produced a Vec<RecordBatch> and then incrementally fed those batches out

Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in #8658

Perhaps something like

                            ▲                                        
                            │                                        
                   ┌────────────────┐                    Output      
  output stream    │  RecordBatch   │              RecordBatches are 
                   └────────────────┘              created in smaller
                            ▲                      chunks and emitted
                            │                          one by one    
                            │                                        
                            │                                        
                ┏━━━━━━━━━━━━━━━━━━━━━━━┓          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃                       ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃    GroupByHashExec    ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┃                       ┃                 ...        
                ┃                       ┃          ┌────────────────┐
                ┃                       ┃          │  RecordBatch   │
                ┃                       ┃          └────────────────┘
                ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
                                                    Vec<RecordBatch> 
                                                                     
                                                                     

Describe alternatives you've considered

No response

Additional context

@yjshen notes:

To improve AggExec's mono output pattern, #7065 might be similar to the idea of incremental output.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions