Skip to content

Extend TopK early termination to partially sorted inputs #15529

Closed
@geoffreyclaude

Description

@geoffreyclaude

Is your feature request related to a problem or challenge?

DataFusion currently has a "TopK early termination" optimization, which speeds up queries that involve ORDER BY and LIMIT if the input data is already sorted by the full ordering requested in the query.

However, many real-world scenarios involve datasets that are only partially sorted.

For example, consider a time-series dataset that's pre-sorted by day but not sorted within each day. Queries requesting data sorted by day, timestamp should still benefit significantly from optimization because once DataFusion has collected the required number of rows from the most recent day(s), it could safely ignore data from earlier days.

Today, DataFusion does not take advantage of such partial ordering, resulting in unnecessary scans and sorts.

Example query affected by this:

SELECT day, sensor_id, reading, timestamp
FROM sensor_readings
WHERE sensor_id = 1002
ORDER BY day DESC, timestamp DESC
LIMIT 10;

If the data source providing sensor_readings can guarantee a day DESC ordering, this query should quickly finish after scanning enough rows from the most recent days, but currently DataFusion will continue scanning unnecessarily the full sensor_readings.

Describe the solution you'd like

I propose extending DataFusion's existing "TopK early termination" optimization to handle cases where the input data is partially sorted by a prefix of the requested ordering.

Specifically, DataFusion should detect:

  • When the input ordering has a non-empty common prefix with the query's requested ordering.
  • When the top-K buffer is full.
  • If all still pending rows are guaranteed to be strictly greater than the top-K's max value, comparing only on the common prefix.

Under these conditions, DataFusion can safely terminate scanning early, significantly improving query performance and reducing resource consumption.

Describe alternatives you've considered

No response

Additional context

I wasn't able to find benchmarks on already sorted data. EDIT: I opened #15559 to track this need with an attached PR

However, a simple reproducer from the TPCH dataset could be:

CREATE EXTERNAL TABLE lineitem_ship (
    l_shipdate     DATE,
    l_commitdate   DATE,
    l_shipmode     VARCHAR,
    l_quantity     INT
)
STORED AS PARQUET
LOCATION 'scratch/topk'
WITH ORDER (l_shipdate);

INSERT INTO lineitem_ship
SELECT
    l_shipdate,
    l_commitdate,
    l_shipmode,
    l_quantity
FROM lineitem
ORDER BY l_shipdate;

SELECT
    l_shipdate,
    l_commitdate,
    l_quantity
FROM lineitem_ship
WHERE l_shipmode IN ('MAIL', 'AIR')
ORDER BY l_shipdate, l_commitdate, l_quantity
LIMIT 10;

This query today scans the full lineitem_ship table. I'd expect it to be orders of magnitude faster with the sort prefix enhancement.

Note: this optimization could also be extended to allow streaming out partial results of the SortExec (with or without fetch limit) as soon as their ordering is guaranteed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions