Description
Describe the bug
We have a case where the EnforceDistribution
rule has repatitioned a ParquetExec which parallelized the read (which is good) but that parallelization resulted in destroying the sort order (as it mixes parts of different files together in the same partition). The rest of the plan relies on the output being sorted, and thus since it is no longer sorted we see incorrect results
To Reproduce
The input plan looks like this:
OutputRequirementExec
ProjectionExec: expr=[tag@1 as tag]
FilterExec: CAST(field@0 AS Utf8) !=
ProjectionExec: expr=[field@1 as field, tag@3 as tag]
DeduplicateExec: [tag@3 ASC,time@2 ASC]
FilterExec: tag@3 > foo AND time@2 > 2
ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC], ...
The output of EnforceDistirbution looks like this:
2023-12-06T18:40:19.827226Z TRACE datafusion::physical_planner: Optimized physical plan by EnforceDistribution:
OutputRequirementExec
ProjectionExec: expr=[tag@1 as tag]
FilterExec: CAST(field@0 AS Utf8) !=
RepartitionExec: partitioning=RoundRobinBatch(6), input_partitions=1
ProjectionExec: expr=[field@1 as field, tag@3 as tag]
DeduplicateExec: [tag@3 ASC,time@2 ASC]
SortPreservingMergeExec: [tag@3 ASC,time@2 ASC,__chunk_order@0 ASC] <----- This needs the input to be sorted
FilterExec: tag@3 > foo AND time@2 > 2
ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ... <---- this file is no longer sorted (as it was repartitioned)
Specifically, the DataFusion planner parallelized the read of the parquet files into multiple partitions and in so doing has destroyed the sort order.
(the 16666666..33333333
annotations mean read that byte range in the file)
This is actually reflected correctly by the ParquetExec
(it no longer says "output_ordering" because it is no longer sorted) however, the plan now has a SortPreservingMerge
added above it, which implies that the output is sorted, which is incorrect.
Input
ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC],....
Output:
ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ...
So things that are wrong:
- The output of the scan is no longer sorted but it is being merged using
SortPreservingMerge
(which avoids the required resort) - It is not right to be repartitioning the sorted input files into multiple partitions in the first place, as that destroys the sort order. There is a config setting that is supposed to control this
datafusion.optimizer.prefer_existing_sort
and IOx sets it to true:
I am working on a reproducer in DataFusion
Expected behavior
The correct answer should be produced.
I think this means that either:
- the
ParquetExec
should not be repartitioned if it would destroy the sort order, - The
parquet exec
repartition code should be aware of the repartition and not destroy the sort order
Additional context
We found that setting the config setting datafusion.optimizer.repartition_file_scans
and IOx sets to false was a workaround: