Description
- Part of [EPIC] Avoid sort for already sorted Parquet files that do not overlap values on condition #6672
This is a potential design to support
It is largely copy/paste from an internal design I wrote for a project at InfluxData
We are planning to propose upstreaming what we do, and @wiedld is working on this
- [EPIC] Avoid sort for already sorted Parquet files that do not overlap values on condition #6672
- Specifically feat: Add
ProgressiveEval
operator #10490
I purposely wrote it in markdown to make it easier to copy/paste the diagrams and explanation into code.
Background:
📖 The following description uses the DataFusions definition of a partition , not the IOx one (how data is divided across files)
What is SortPreservingMerge
?
SortPreservingMerge
is DataFusion operator that merges data row by row from multiple sorted input partitions. In order to produce any output rows, the SortPreservingMerge
must open all its inputs (e.g. must open several parquet files). Here is a diagram:
┌─────────────────────────┐
│ ┌───┬───┬───┬───┐ │
│ │ A │ B │ C │ D │ ... │──┐
│ └───┴───┴───┴───┘ │ │
└─────────────────────────┘ │ ┌───────────────────┐ ┌───────────────────────────────┐
Partition 1 │ │ │ │ ┌───┬───╦═══╦───┬───╦═══╗ │
├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C │ D ║ E ║ ... │
│ │ │ │ └───┴───╩═══╩───┴───╩═══╝ │
┌─────────────────────────┐ │ └───────────────────┘ └───────────────────────────────┘
│ ╔═══╦═══╗ │ │
│ ║ B ║ E ║ ... │──┘
│ ╚═══╩═══╝ │
└─────────────────────────┘
Partition 2
Input Partitions Output Partition
(sorted) (sorted)
What is ProgressiveEval
?
ProgressiveEval
is a special operator. See the blog post Making Most Recent Value Queries Hundreds of Times Faster for more details
ProgressiveEval
outputs in order"
- all rows for its first input partition
- all rows for its second input partition
- all rows for its third input partition
- and so on.
Note that ProgressiveEval
only starts [2 (configurable)] inputs at a time. Here is a diagram
┌─────────────────────────┐
│ ┌───┬───┬───┬───┐ │
│ │ A │ B │ C │ D │ │──┐
│ └───┴───┴───┴───┘ │ │
└─────────────────────────┘ │ ┌───────────────────┐ ┌───────────────────────────────┐
Stream 1 │ │ │ │ ┌───┬───╦═══╦───┬───╦═══╗ │
├─▶│ ProgressiveEval │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │
│ │ │ │ └───┴─▲─╩═══╩───┴───╩═══╝ │
┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
│ ╔═══╦═══╗ │ │
│ ║ M ║ N ║ │──┘ │
│ ╚═══╩═══╝ │ Output is all rows from Stream 1 followed by all rows from Stream 2, etc
└─────────────────────────┘
Stream 2
Input Streams Output stream
(in some order) (in same order)
Why is SortPreservingMerge
better then ProgressiveEval
?
When possible, ProgressiveEval
should be used instead of SortPreservingMerge
because:
- It is faster and less CPU intensive (it doesn't need to compare individual rows across partitions)
- It is more efficient with
limit
(as it does not start all the input streams at once).
Under what conditions SortPreservingMerge
be converted to ProgressiveEval
?
In order to convert a SortPreservingMerge
(SPM) to ProgressiveEval
, the plans must still produce the same results. We know all input partitions to the SPM are sorted on the sort expressions (this is required for correctness) and the output of the SPM will also be sorted on these expressions
We define the "Lexical Space" as the space of all possible values of the sort expressions. For example, given data with a sort order of A ASC, B ASC
(A
ascending, B
ascending), then the lexical space is all the unique combinations of (A, B)
. The "range" of an input in this lexical space is the minimum and maximum sort key values.
For example, for data like
a |
b |
---|---|
1 | 100 |
1 | 200 |
1 | 300 |
2 | 100 |
2 | 200 |
3 | 50 |
The lexical range is min --> max
: (1,100) --> (3,50)
Using a ProgressiveEval
instead of SortPreservingMerge
requires
- The input partitions's lexical ranges do not overlap
- The partitions are ordered in increasing key space
When this is the case, concatenating such partitions together results in the same otuput as a sorted stream, and thus the output of ProgressiveEval
and SortPreservingMerge
are the same
Example: Using ProgressiveEval
can be used instead of a SortPreservingMerge
.
In the following example, the input streams have non overlaping lexical ranges in order and thus SortPreservingMerge
and ProgressiveEval
produce the same output
- Partition 1:
(1,100) --> (2,200)
- Partition 2:
(2,200) --> (2,200)
- Partition 3:
(2,300) --> (3,100)
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┏━━━━┳━━━━┓ │ Input streams:
│ ┃ A ┃ B ┃ ✅ Non overlapping
┣────╋────┫ │ ✅ Ordered
│ │ 1 │100 │
├────┼────┤ │
│ │ 1 │200 │
├────┼────┤ │
│ │ 2 │100 │
├────┼────┤ │
│ │ 2 │200 │
├────┼────┤ │
│ │ 2 │300 │
├────┼────┤ │
│ │ 3 │100 │
└────┴────┘ │ Output
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
▲ ▲
┌──────────────────────────┘ └──────────────────────────┐
│ │
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ SortPreservingMerge ┃ ┃ ProgressiveEval ┃
┃ exprs = [a ASC, b ASC] ┃ ┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
▲ ▲
└──────────────────────────┐ ┌──────────────────────────┘
│ │
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ │
┃ A ┃ B ┃ ┃ A ┃ B ┃ ┃ A ┃ B ┃
│ ┣────╋────┫ ┣────╋────┫ ┣────╋────┫ │
│ 1 │100 │ │ 2 │200 │ │ 2 │300 │
│ ├────┼────┤ └────┴────┘ ├────┼────┤ │
│ 1 │200 │ │ 3 │100 │
│ ├────┼────┤ └────┴────┘ │
│ 2 │100 │
│ └────┴────┘ │
│ Partition 1 Partition 2 Partition 3 │
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
Input
Counter Example 1: Out of order Partitions
In the following example, the input partitions still have non overlaping lexical ranges, but they are NOT in order. Therefore the the output of the ProgressiveEval
(which concatenates the streams) is different than SortPreservingMerge
, and thus in this case we can NOT use ProgressiveEvalExec
:
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┏━━━━┳━━━━┓ │ ┏━━━━┳━━━━┓ │
│ ┃ A ┃ B ┃ │ ┃ A ┃ B ┃ Input streams:
┣────╋────┫ │ ┣────╋────┫ │ ✅ Non overlapping
│ │ 1 │100 │ │ │ 1 │100 │ ❌ Ordered
├────┼────┤ │ Partition 3 Partition 3 ├────┼────┤ │
│ │ 1 │200 │ comes before comes after │ │ 1 │200 │
├────┼────┤ │ │ │ ├────┼────┤ │
│ │ 2 │100 │ │ │ 2 │100 │
╠════╬════╣ │ │ │ ├────┼────┤ │
│ ║ 2 ║200 ║◀ ─ ─ ─ ─ ─ ─ ─ │ │ 2 │300 │
╠────╬────╣ │ │ ├────┼────┤ │
│ │ 2 │300 │ │ │ 3 │100 │
├────┼────┤ │ Output │ ╠────╬────╣ │
│ │ 3 │100 │ (same as ─ ─ ─ ─ ┼ ─ ▶║ 2 ║200 ║
└────┴────┘ │ above) ╚════╩════╝ │ Output
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
▲ ▲
│ │
│ │
│ │
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ SortPreservingMerge ┃ ┃ ProgressiveEval ┃
┃ exprs = [a ASC, b ASC] ┃ ┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
▲ ▲
└──────────────────────────┐ ┌──────────────────────────┘
│ │
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ │
┃ A ┃ B ┃ ┃ A ┃ B ┃ ┃ A ┃ B ┃
│ ┣────╋────┫ ┣────╋────┫ ┣────╋────┫ │
│ 1 │100 │ │ 2 │300 │ │ 2 │200 │
│ ├────┼────┤ ├────┼────┤ └────┴────┘ │
│ 1 │200 │ │ 3 │100 │
│ ├────┼────┤ └────┴────┘ │
│ 2 │100 │ Input
│ └────┴────┘ │
Note Partition 2 and
│ Partition 1 Partition 2 Partition 3 │ Partition 3 are in
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ different order
Counter Example 2: Overlapping Partitions
When we have partitions that do overlap in lexical ranges, it is more clear that the output of the two operators is different. When ProgressiveEval
appends the input streams together they will not be sorted as shown in the following figure
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┏━━━━┳━━━━┓ │ ┏━━━━┳━━━━┓ │
│ ┃ A ┃ B ┃ Partition 3 │ ┃ A ┃ B ┃ Input streams:
┣────╋────┫ │ comes before ┣────╋────┫ │ ❌ Non overlapping
│ │ 1 │100 │ │ │ │ 1 │100 │ ✅ Ordered
├────┼────┤ │ ├────┼────┤ │
│ │ 1 │200 │ │ │ │ 1 │200 │
├────┼────┤ │ ├────┼────┤ │
│ │ 2 │200 │ │ ┌ ─ ─ ─ ─│─ ─▶│ 2 │250 │
├────┼────┤ │ ├────┼────┤ │
│ │ 2 │250 │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ 2 │200 │
├────┼────┤ │ Partition 3 ├────┼────┤ │
│ │ 2 │300 │ comes after │ │ 2 │300 │
├────┼────┤ │ Partition 2 ├────┼────┤ │
│ │ 3 │100 │ │ │ 3 │100 │
└────┴────┘ │ └────┴────┘ │ Output
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
▲ ▲
│ │
│ │
│ │
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ SortPreservingMerge ┃ ┃ ProgressiveEval ┃
┃ exprs = [a ASC, b ASC] ┃ ┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
▲ ▲
└──────────────────────────┐ ┌──────────────────────────┘
│ │
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ ┏━━━━┳━━━━┓ │
┃ A ┃ B ┃ ┃ A ┃ B ┃ ┃ A ┃ B ┃
│ ┣────╋────┫ ┣────╋────┫ ┣━━━━╋────┫ │
│ 1 │100 │ │ 2 │200 │ ┃ 2 │300 │
│ ├────┼────┤ └────┴────┘ ┣────┼────┤ │ Input
│ 1 │200 │ │ 3 │100 │
│ ├────┼────┤ └────┴────┘ │ Partition 3 overlaps
│ 2 │250 │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ with partition 1's
│ └────┴────┘ │ lexical range
│ Partition 1 Partition 2 Partition 3 │
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
Proposed Algorithm
Step 1: find min/max values for each sort key column for for each input partiton to a SortPreservingMerge.
We can not get this information reliably from DataFusion statistics yet due to
However, internally at influx we have a special analysis that works for time
.
We can also use the EquivalenceProperties::constants
to determine min/max values for constants (what is needed for OrderUnionSortedInputsForConstants
)
If we can't determine all min/max values ==> no transform
Step 2: Determine if the Lexical Space overlaps
The algorithm that converts mins/maxes to arrow arrays and then calls the rank kernel on it
Details
/// Sort the given vector by the given value ranges
/// Return none if
/// . the number of plans is not the same as the number of value ranges
/// . the value ranges overlap
/// Return the sorted plans and the sorted vallue ranges
fn sort_by_value_ranges<T>(
plans: Vec<T>,
value_ranges: Vec<(ScalarValue, ScalarValue)>,
sort_options: SortOptions,
) -> Result<Option<SortedVecAndValueRanges<T>>> {
if plans.len() != value_ranges.len() {
trace!(
plans.len = plans.len(),
value_ranges.len = value_ranges.len(),
"--------- number of plans is not the same as the number of value ranges"
);
return Ok(None);
}
if overlap(&value_ranges)? {
trace!("--------- value ranges overlap");
return Ok(None);
}
// get the min value of each value range
let min_iter = value_ranges.iter().map(|(min, _)| min.clone());
let mins = ScalarValue::iter_to_array(min_iter)?;
// rank the min values
let ranks = rank(&*mins, Some(sort_options))?;
// no need to sort if it is already sorted
if ranks.iter().zip(ranks.iter().skip(1)).all(|(a, b)| a <= b) {
return Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)));
}
// sort the plans by the ranks of their min values
let mut plan_rank_zip: Vec<(T, u32)> = plans
.into_iter()
.zip(ranks.iter().copied())
.collect::<Vec<_>>();
plan_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
let plans = plan_rank_zip
.into_iter()
.map(|(plan, _)| plan)
.collect::<Vec<_>>();
// Sort the value ranges by the ranks of their min values
let mut value_range_rank_zip: Vec<((ScalarValue, ScalarValue), u32)> =
value_ranges.into_iter().zip(ranks).collect::<Vec<_>>();
value_range_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
let value_ranges = value_range_rank_zip
.into_iter()
.map(|(value_range, _)| value_range)
.collect::<Vec<_>>();
Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)))
}
We will need to order the inputs by minimum value and then ensure:
- All maxes are in ascending order
- min_{i} <= max_{j+i} for all rows
i
>j
If we the lexical space overlaps ==> no transform
Step 3: Reorder inputs, if needed and possible
If the input partitions are non overlapping, attempt to reorder the input partitons if needed
It is possible to reorder input partitions for certian plans such as UnionExec and ParquetExec
, but it is not possible to reorder the input partitions for others (like RepartitionExec
)
If we cannot reorder the partitions to have a lexical overlap ==> no transform