Skip to content

Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates #6937

Closed
@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

When running a query with "high cardinality" grouping in DataFusion, the memory usage increases linearly both with the number of groups (expected) but also with the number of cores.

Is the root cause of @ychen7's observation that ClickBench q32 fails As #5276 (comment)

To reproduce, get the ClickBench data https://github.com/ClickHouse/ClickBench/tree/main#data-loading and run this:

CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'hits.parquet';

set datafusion.execution.target_partitions = 1;
SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;

set datafusion.execution.target_partitions = 4;
SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;

This is what the memory usage looks like:
Screenshot 2023-07-12 at 4 07 10 PM

The reason for this behavior can be found in the plan and the multi-stage hash grouping that is done:

explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |   SortPreservingMergeExec: [c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |     SortExec: fetch=10, expr=[c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |       ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                       |
|               |         AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                        |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |             RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 16), input_partitions=16                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |               AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                           |
|               |                 ParquetExec: file_groups={16 groups: [[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:0..923748528], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:923748528..1847497056], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:1847497056..2771245584], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:2771245584..3694994112], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Specifically since the groups are arbitrarily distributed in the files, the first AggregateExec: mode=Partial has to build a hash table that has entries for all groups. As the number of target partitions goes up, the number of AggregateExec: mode=Partial goes up to and thus so does the number of copies of the data

The AggregateExec: mode=FinalPartitioned only see a distinct subset of the keys and thus as the number of target partitions goes up there are more AggregateExec: mode=FinalPartitioned s each sees a smaller and smaller subset of the group keys

In pictures:

               ▲                          ▲                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
               │                          │                                                    
   ┌───────────────────────┐  ┌───────────────────────┐       4. Each AggregateMode::Final     
   │GroupBy                │  │GroupBy                │       GroupBy has an entry for its     
   │(AggregateMode::Final) │  │(AggregateMode::Final) │       subset of groups (in this case   
   │                       │  │                       │       that means half the entries)     
   └───────────────────────┘  └───────────────────────┘                                        
               ▲                          ▲                                                    
               │                          │                                                    
               └─────────────┬────────────┘                                                    
                             │                                                                 
                             │                                                                 
                             │                                                                 
                ┌─────────────────────────┐                   3. Repartitioning by hash(group  
                │       Repartition       │                   keys) ensures that each distinct 
                │         HASH(x)         │                   group key now appears in exactly 
                └─────────────────────────┘                   one partition                    
                             ▲                                                                 
                             │                                                                 
             ┌───────────────┴─────────────┐                                                   
             │                             │                                                   
             │                             │                                                   
┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial   
│        GroubyBy         │  │         GroubyBy         │     GroupBy has an entry for *all*   
│(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     the groups                       
└─────────────────────────┘  └──────────────────────────┘                                      
             ▲                             ▲                                                   
             │                            ┌┘                                                   
             │                            │                                                    
        .─────────.                  .─────────.                                               
     ,─'           '─.            ,─'           '─.                                            
    ;      Input      :          ;      Input      :          1. Since input data is           
    :   Partition 0   ;          :   Partition 1   ;          arbitrarily or RoundRobin        
     ╲               ╱            ╲               ╱           distributed, each partition      
      '─.         ,─'              '─.         ,─'            likely has all distinct          
         `───────'                    `───────'                                                                           

Some example data:

              ┌─────┐                ┌─────┐                                                 
              │  1  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  2  │                │  4  │                After repartitioning by          
              └─────┘                └─────┘                hash(group keys), each distinct  
              ┌─────┐                ┌─────┐                group key now appears in exactly 
              │  1  │                │  3  │                one partition                    
              ├─────┤                ├─────┤                                                 
              │  2  │                │  4  │                                                 
              └─────┘                └─────┘                                                 
                                                                                             
                                                                                             
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                
                                                                                             
              ┌─────┐                ┌─────┐                                                 
              │  2  │                │  2  │                                                 
              ├─────┤                ├─────┤                                                 
              │  1  │                │  2  │                                                 
              ├─────┤                ├─────┤                                                 
              │  3  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  4  │                │  1  │                                                 
              └─────┘                └─────┘                Input data is arbitrarily or     
                ...                    ...                  RoundRobin distributed, each     
              ┌─────┐                ┌─────┐                partition likely has all         
              │  1  │                │  4  │                distinct group keys              
              ├─────┤                ├─────┤                                                 
              │  4  │                │  3  │                                                 
              ├─────┤                ├─────┤                                                 
              │  1  │                │  1  │                                                 
              ├─────┤                ├─────┤                                                 
              │  4  │                │  3  │                                                 
              └─────┘                └─────┘                                                 
                                                                                             
          group values           group values                                                
          in partition 0         in partition 1                                              
                                                                                             

Describe the solution you'd like

TLDR is I would like to propose updating the AggregateExec: mode=Partial to emit their hash tables if they see more than some fixed size number of groups (I think @mingmwang said DuckDB uses a value of 10,000 for this)

This approach bounds the memory usage (to some fixed constant * the target partitioning) and also should perform quite well

In the literature I think this approach could be called "dynamic partitioning" as it switches approaches based on the actual cardinality of the groups in the dataset

Describe alternatives you've considered

One potential thing that might be suggested is simply to repartition the input to AggregateExec: mode=Partial

This approach would definitely reduce the memory requirements, but it would mean that we would have to hash repartition all the input rows so the number of input values that need to be hashed / copied would likely be much higher (at least as long as the group hasher and hash repartitioner can't share the hashes, which is the case today)

The current strategy actually works very well for low cardinality group bys because the AggregateExec: mode=Partial can reduce the size of the intermediate result that needs to be hash repartitioned to a very small size

Additional context

We saw this in IOx while working on some tracing queries that look very similar to the ClickBench query, something like the following to get the top ten traces

SELECT trace_id, max(time)
FROM traces
GROUP BY trace_id
ORDER BY max(time)
LIMIT 10;

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions