Skip to content

Incorrect results due to repartitioning a sorted ParquetExec #8451

Closed
@alamb

Description

@alamb

Describe the bug

We have a case where the EnforceDistribution rule has repatitioned a ParquetExec which parallelized the read (which is good) but that parallelization resulted in destroying the sort order (as it mixes parts of different files together in the same partition). The rest of the plan relies on the output being sorted, and thus since it is no longer sorted we see incorrect results

To Reproduce

The input plan looks like this:

OutputRequirementExec
  ProjectionExec: expr=[tag@1 as tag]
    FilterExec: CAST(field@0 AS Utf8) !=
      ProjectionExec: expr=[field@1 as field, tag@3 as tag]
        DeduplicateExec: [tag@3 ASC,time@2 ASC]
          FilterExec: tag@3 > foo AND time@2 > 2
            ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC], ...

The output of EnforceDistirbution looks like this:

2023-12-06T18:40:19.827226Z TRACE datafusion::physical_planner: Optimized physical plan by EnforceDistribution:
OutputRequirementExec
  ProjectionExec: expr=[tag@1 as tag]
    FilterExec: CAST(field@0 AS Utf8) !=
      RepartitionExec: partitioning=RoundRobinBatch(6), input_partitions=1
        ProjectionExec: expr=[field@1 as field, tag@3 as tag]
          DeduplicateExec: [tag@3 ASC,time@2 ASC]
            SortPreservingMergeExec: [tag@3 ASC,time@2 ASC,__chunk_order@0 ASC] <----- This needs the input to be sorted
              FilterExec: tag@3 > foo AND time@2 > 2
                ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ... <---- this file is no longer sorted (as it was repartitioned)

Specifically, the DataFusion planner parallelized the read of the parquet files into multiple partitions and in so doing has destroyed the sort order.

(the 16666666..33333333 annotations mean read that byte range in the file)

This is actually reflected correctly by the ParquetExec (it no longer says "output_ordering" because it is no longer sorted) however, the plan now has a SortPreservingMerge added above it, which implies that the output is sorted, which is incorrect.

Input

ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, time@2 ASC, __chunk_order@0 ASC],....

Output:

ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 2.parquet:0..16666666], [2.parquet:16666666..33333333], [2.parquet:33333333..50000000], [2.parquet:50000000..66666667], [2.parquet:66666667..83333334], ...]}, ...

So things that are wrong:

  1. The output of the scan is no longer sorted but it is being merged using SortPreservingMerge (which avoids the required resort)
  2. It is not right to be repartitioning the sorted input files into multiple partitions in the first place, as that destroys the sort order. There is a config setting that is supposed to control this datafusion.optimizer.prefer_existing_sort and IOx sets it to true:

I am working on a reproducer in DataFusion

Expected behavior

The correct answer should be produced.

I think this means that either:

  1. the ParquetExec should not be repartitioned if it would destroy the sort order,
  2. The parquet exec repartition code should be aware of the repartition and not destroy the sort order

Additional context

We found that setting the config setting datafusion.optimizer.repartition_file_scans and IOx sets to false was a workaround:

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions