Description
Is your feature request related to a problem or challenge?
DataFusion currently has a "TopK early termination" optimization, which speeds up queries that involve ORDER BY
and LIMIT
if the input data is already sorted by the full ordering requested in the query.
However, many real-world scenarios involve datasets that are only partially sorted.
For example, consider a time-series dataset that's pre-sorted by day
but not sorted within each day. Queries requesting data sorted by day, timestamp
should still benefit significantly from optimization because once DataFusion has collected the required number of rows from the most recent day(s), it could safely ignore data from earlier days.
Today, DataFusion does not take advantage of such partial ordering, resulting in unnecessary scans and sorts.
Example query affected by this:
SELECT day, sensor_id, reading, timestamp
FROM sensor_readings
WHERE sensor_id = 1002
ORDER BY day DESC, timestamp DESC
LIMIT 10;
If the data source providing sensor_readings
can guarantee a day DESC
ordering, this query should quickly finish after scanning enough rows from the most recent days, but currently DataFusion will continue scanning unnecessarily the full sensor_readings
.
Describe the solution you'd like
I propose extending DataFusion's existing "TopK early termination" optimization to handle cases where the input data is partially sorted by a prefix of the requested ordering.
Specifically, DataFusion should detect:
- When the input ordering has a non-empty common prefix with the query's requested ordering.
- When the top-K buffer is full.
- If all still pending rows are guaranteed to be strictly greater than the top-K's max value, comparing only on the common prefix.
Under these conditions, DataFusion can safely terminate scanning early, significantly improving query performance and reducing resource consumption.
Describe alternatives you've considered
No response
Additional context
I wasn't able to find benchmarks on already sorted data. EDIT: I opened #15559 to track this need with an attached PR
However, a simple reproducer from the TPCH dataset could be:
CREATE EXTERNAL TABLE lineitem_ship (
l_shipdate DATE,
l_commitdate DATE,
l_shipmode VARCHAR,
l_quantity INT
)
STORED AS PARQUET
LOCATION 'scratch/topk'
WITH ORDER (l_shipdate);
INSERT INTO lineitem_ship
SELECT
l_shipdate,
l_commitdate,
l_shipmode,
l_quantity
FROM lineitem
ORDER BY l_shipdate;
SELECT
l_shipdate,
l_commitdate,
l_quantity
FROM lineitem_ship
WHERE l_shipmode IN ('MAIL', 'AIR')
ORDER BY l_shipdate, l_commitdate, l_quantity
LIMIT 10;
This query today scans the full lineitem_ship
table. I'd expect it to be orders of magnitude faster with the sort prefix enhancement.
Note: this optimization could also be extended to allow streaming out partial results of the SortExec
(with or without fetch limit) as soon as their ordering is guaranteed.