Skip to content

Analysis to supportSortPreservingMerge --> ProgressiveEval #15191

Open
@alamb

Description

@alamb

This is a potential design to support

It is largely copy/paste from an internal design I wrote for a project at InfluxData

We are planning to propose upstreaming what we do, and @wiedld is working on this

I purposely wrote it in markdown to make it easier to copy/paste the diagrams and explanation into code.

Background:

📖 The following description uses the DataFusions definition of a partition , not the IOx one (how data is divided across files)

What is SortPreservingMerge?

SortPreservingMerge is DataFusion operator that merges data row by row from multiple sorted input partitions. In order to produce any output rows, the SortPreservingMerge must open all its inputs (e.g. must open several parquet files). Here is a diagram:

┌─────────────────────────┐
│ ┌───┬───┬───┬───┐       │
│ │ A │ B │ C │ D │ ...   │──┐
│ └───┴───┴───┴───┘       │  │
└─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
  Partition 1                │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
                             ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C │ D ║ E ║ ... │
                             │  │                   │    │ └───┴───╩═══╩───┴───╩═══╝     │
┌─────────────────────────┐  │  └───────────────────┘    └───────────────────────────────┘
│ ╔═══╦═══╗               │  │
│ ║ B ║ E ║     ...       │──┘
│ ╚═══╩═══╝               │
└─────────────────────────┘
  Partition 2


 Input Partitions                                          Output Partition
   (sorted)                                                  (sorted)

What is ProgressiveEval?

ProgressiveEval is a special operator. See the blog post Making Most Recent Value Queries Hundreds of Times Faster for more details

ProgressiveEval outputs in order"

  1. all rows for its first input partition
  2. all rows for its second input partition
  3. all rows for its third input partition
  4. and so on.

Note that ProgressiveEval only starts [2 (configurable)] inputs at a time. Here is a diagram

┌─────────────────────────┐
│ ┌───┬───┬───┬───┐       │
│ │ A │ B │ C │ D │       │──┐
│ └───┴───┴───┴───┘       │  │
└─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
  Stream 1                   │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
                             ├─▶│  ProgressiveEval  │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │
                             │  │                   │    │ └───┴─▲─╩═══╩───┴───╩═══╝     │
┌─────────────────────────┐  │  └───────────────────┘    └─┬─────┴───────────────────────┘
│ ╔═══╦═══╗               │  │
│ ║ M ║ N ║               │──┘                             │
│ ╚═══╩═══╝               │                Output is all rows from Stream 1 followed by all rows from Stream 2, etc
└─────────────────────────┘
  Stream 2


 Input Streams                                             Output stream
 (in some order)                                           (in same order)

Why is SortPreservingMerge better then ProgressiveEval?

When possible, ProgressiveEval should be used instead of SortPreservingMerge because:

  1. It is faster and less CPU intensive (it doesn't need to compare individual rows across partitions)
  2. It is more efficient with limit (as it does not start all the input streams at once).

Under what conditions SortPreservingMerge be converted to ProgressiveEval?

In order to convert a SortPreservingMerge (SPM) to ProgressiveEval, the plans must still produce the same results. We know all input partitions to the SPM are sorted on the sort expressions (this is required for correctness) and the output of the SPM will also be sorted on these expressions

We define the "Lexical Space" as the space of all possible values of the sort expressions. For example, given data with a sort order of A ASC, B ASC (A ascending, B ascending), then the lexical space is all the unique combinations of (A, B). The "range" of an input in this lexical space is the minimum and maximum sort key values.

For example, for data like

a b
1 100
1 200
1 300
2 100
2 200
3 50

The lexical range is min --> max: (1,100) --> (3,50)

Using a ProgressiveEval instead of SortPreservingMerge requires

  1. The input partitions's lexical ranges do not overlap
  2. The partitions are ordered in increasing key space

When this is the case, concatenating such partitions together results in the same otuput as a sorted stream, and thus the output of ProgressiveEval and SortPreservingMerge are the same

Example: Using ProgressiveEval can be used instead of a SortPreservingMerge.

In the following example, the input streams have non overlaping lexical ranges in order and thus SortPreservingMerge and ProgressiveEval produce the same output

  • Partition 1: (1,100) --> (2,200)
  • Partition 2: (2,200) --> (2,200)
  • Partition 3: (2,300) --> (3,100)
                                      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                       
                                           ┏━━━━┳━━━━┓     │                Input streams:        
                                      │    ┃ A  ┃ B  ┃                      ✅ Non overlapping     
                                           ┣────╋────┫     │                ✅ Ordered             
                                      │    │ 1  │100 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 1  │200 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │100 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │200 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │300 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 3  │100 │                                            
                                           └────┴────┘     │  Output                              
                                      └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                       
                                                ▲ ▲                                               
                     ┌──────────────────────────┘ └──────────────────────────┐                    
                     │                                                       │                    
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
                     ▲                                                       ▲                    
                     └──────────────────────────┐ ┌──────────────────────────┘                    
                                                │ │                                               
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                         
                                                                                                  
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                         
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                            
                          │  ┣────╋────┫    ┣────╋────┫    ┣────╋────┫  │                         
                             │ 1  │100 │    │ 2  │200 │    │ 2  │300 │                            
                          │  ├────┼────┤    └────┴────┘    ├────┼────┤  │                         
                             │ 1  │200 │                   │ 3  │100 │                            
                          │  ├────┼────┤                   └────┴────┘  │                         
                             │ 2  │100 │                                                          
                          │  └────┴────┘                                │                         
                                                                                                  
                          │  Partition 1    Partition 2    Partition 3  │                         
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          
                                                                            Input                 

Counter Example 1: Out of order Partitions

In the following example, the input partitions still have non overlaping lexical ranges, but they are NOT in order. Therefore the the output of the ProgressiveEval (which concatenates the streams) is different than SortPreservingMerge, and thus in this case we can NOT use ProgressiveEvalExec:

          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
               ┏━━━━┳━━━━┓     │                                       ┏━━━━┳━━━━┓     │                   
          │    ┃ A  ┃ B  ┃                                        │    ┃ A  ┃ B  ┃       Input streams:    
               ┣────╋────┫     │                                       ┣────╋────┫     │ ✅ Non overlapping 
          │    │ 1  │100 │                                        │    │ 1  │100 │       ❌ Ordered         
               ├────┼────┤     │    Partition 3     Partition 3        ├────┼────┤     │                   
          │    │ 1  │200 │         comes before     comes after   │    │ 1  │200 │                         
               ├────┼────┤     │         │               │             ├────┼────┤     │                   
          │    │ 2  │100 │                                        │    │ 2  │100 │                         
               ╠════╬════╣     │         │               │             ├────┼────┤     │                   
          │    ║ 2  ║200 ║◀ ─ ─ ─ ─ ─ ─ ─                         │    │ 2  │300 │                         
               ╠────╬────╣     │                         │             ├────┼────┤     │                   
          │    │ 2  │300 │                                        │    │ 3  │100 │                         
               ├────┼────┤     │   Output                │             ╠────╬────╣     │                   
          │    │ 3  │100 │        (same as                ─ ─ ─ ─ ┼ ─ ▶║ 2  ║200 ║                         
               └────┴────┘     │   above)                              ╚════╩════╝     │   Output          
          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
                     ▲                                                       ▲                             
                     │                                                       │                             
                     │                                                       │                             
                     │                                                       │                             
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓         
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃         
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃         
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛         
                     ▲                                                       ▲                             
                     └──────────────────────────┐ ┌──────────────────────────┘                             
                                                │ │                                                        
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                                  
                                                                                                           
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                                  
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                                     
                          │  ┣────╋────┫    ┣────╋────┫    ┣────╋────┫  │                                  
                             │ 1  │100 │    │ 2  │300 │    │ 2  │200 │                                     
                          │  ├────┼────┤    ├────┼────┤    └────┴────┘  │                                  
                             │ 1  │200 │    │ 3  │100 │                                                    
                          │  ├────┼────┤    └────┴────┘                 │                                  
                             │ 2  │100 │                                      Input                        
                          │  └────┴────┘                                │                                  
                                                                           Note Partition 2 and            
                          │  Partition 1    Partition 2    Partition 3  │   Partition 3 are in             
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─      different order              

Counter Example 2: Overlapping Partitions

When we have partitions that do overlap in lexical ranges, it is more clear that the output of the two operators is different. When ProgressiveEval appends the input streams together they will not be sorted as shown in the following figure

          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
               ┏━━━━┳━━━━┓     │                                       ┏━━━━┳━━━━┓     │                   
          │    ┃ A  ┃ B  ┃                 Partition 3            │    ┃ A  ┃ B  ┃       Input streams:    
               ┣────╋────┫     │          comes before                 ┣────╋────┫     │ ❌ Non overlapping 
          │    │ 1  │100 │                      │                 │    │ 1  │100 │       ✅ Ordered         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 1  │200 │                      │                 │    │ 1  │200 │                         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 2  │200 │                      │        ┌ ─ ─ ─ ─│─ ─▶│ 2  │250 │                         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 2  │250 │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘        │        │    │ 2  │200 │                         
               ├────┼────┤     │                    Partition 3        ├────┼────┤     │                   
          │    │ 2  │300 │                          comes after   │    │ 2  │300 │                         
               ├────┼────┤     │                    Partition 2        ├────┼────┤     │                   
          │    │ 3  │100 │                                        │    │ 3  │100 │                         
               └────┴────┘     │                                       └────┴────┘     │   Output          
          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
                     ▲                                                       ▲                             
                     │                                                       │                             
                     │                                                       │                             
                     │                                                       │                             
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓         
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃         
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃         
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛         
                     ▲                                                       ▲                             
                     └──────────────────────────┐ ┌──────────────────────────┘                             
                                                │ │                                                        
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                                  
                                                                                                           
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                                  
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                                     
                          │  ┣────╋────┫    ┣────╋────┫    ┣━━━━╋────┫  │                                  
                             │ 1  │100 │    │ 2  │200 │    ┃ 2  │300 │                                     
                          │  ├────┼────┤    └────┴────┘    ┣────┼────┤  │        Input                     
                             │ 1  │200 │                   │ 3  │100 │                                     
                          │  ├────┼────┤                   └────┴────┘  │   Partition 3 overlaps           
                             │ 2  │250 │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ with partition 1's            
                          │  └────┴────┘                                │       lexical range              
                                                                                                           
                          │  Partition 1    Partition 2    Partition 3  │                                  
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   

Proposed Algorithm

Step 1: find min/max values for each sort key column for for each input partiton to a SortPreservingMerge.

We can not get this information reliably from DataFusion statistics yet due to

However, internally at influx we have a special analysis that works for time.

We can also use the EquivalenceProperties::constants to determine min/max values for constants (what is needed for OrderUnionSortedInputsForConstants)

If we can't determine all min/max values ==> no transform

Step 2: Determine if the Lexical Space overlaps

The algorithm that converts mins/maxes to arrow arrays and then calls the rank kernel on it

Details

/// Sort the given vector by the given value ranges
/// Return none if
///    . the number of plans is not the same as the number of value ranges
///    . the value ranges overlap
/// Return the sorted plans and the sorted vallue ranges
fn sort_by_value_ranges<T>(
    plans: Vec<T>,
    value_ranges: Vec<(ScalarValue, ScalarValue)>,
    sort_options: SortOptions,
) -> Result<Option<SortedVecAndValueRanges<T>>> {
    if plans.len() != value_ranges.len() {
        trace!(
            plans.len = plans.len(),
            value_ranges.len = value_ranges.len(),
            "--------- number of plans is not the same as the number of value ranges"
        );
        return Ok(None);
    }

    if overlap(&value_ranges)? {
        trace!("--------- value ranges overlap");
        return Ok(None);
    }

    // get the min value of each value range
    let min_iter = value_ranges.iter().map(|(min, _)| min.clone());
    let mins = ScalarValue::iter_to_array(min_iter)?;

    // rank the min values
    let ranks = rank(&*mins, Some(sort_options))?;

    // no need to sort if it is already sorted
    if ranks.iter().zip(ranks.iter().skip(1)).all(|(a, b)| a <= b) {
        return Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)));
    }

    // sort the plans by the ranks of their min values
    let mut plan_rank_zip: Vec<(T, u32)> = plans
        .into_iter()
        .zip(ranks.iter().copied())
        .collect::<Vec<_>>();
    plan_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
    let plans = plan_rank_zip
        .into_iter()
        .map(|(plan, _)| plan)
        .collect::<Vec<_>>();

    // Sort the value ranges by the ranks of their min values
    let mut value_range_rank_zip: Vec<((ScalarValue, ScalarValue), u32)> =
        value_ranges.into_iter().zip(ranks).collect::<Vec<_>>();
    value_range_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
    let value_ranges = value_range_rank_zip
        .into_iter()
        .map(|(value_range, _)| value_range)
        .collect::<Vec<_>>();

    Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)))
}

We will need to order the inputs by minimum value and then ensure:

  1. All maxes are in ascending order
  2. min_{i} <= max_{j+i} for all rows i > j

If we the lexical space overlaps ==> no transform

Step 3: Reorder inputs, if needed and possible

If the input partitions are non overlapping, attempt to reorder the input partitons if needed

It is possible to reorder input partitions for certian plans such as UnionExec and ParquetExec, but it is not possible to reorder the input partitions for others (like RepartitionExec)

If we cannot reorder the partitions to have a lexical overlap ==> no transform

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions