Skip to content

Conversation

@zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Feb 3, 2025

Which issue does this PR close?

Closes 14406

Rationale for this change

Fix the behaviour for limit with CoalescePartitionsExec.

CoalescePartitionsExec will merge partitions into one, but each partition has it's locallimit, we should not remove the global limit before CoalescePartitionsExec.

What changes are included in this PR?

Fix the behaviour for limit with CoalescePartitionsExec.

Are these changes tested?

Yes, slt testing added.

Are there any user-facing changes?

It will fix the user facing issue.

Before this PR:

with selection as (
    select *
    from 'parquet_files/*'
    limit 1
)
select 1 as foo
from selection
order by duration
limit 1000;

I get:

+-----+
| foo |
+-----+
| 1   |
| 1   |
+-----+
2 row(s) fetched.

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Feb 3, 2025
@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 3, 2025

error: use of deprecated constant `arrow::datatypes::MAX_DECIMAL_FOR_EACH_PRECISION`: Use MAX_DECIMAL128_FOR_EACH_PRECISION (note indexes are different)
  --> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:29:25
   |
29 |     DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION,
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `-D deprecated` implied by `-D warnings`
   = help: to override `-D warnings` add `#[allow(deprecated)]`

error: use of deprecated constant `arrow::datatypes::MIN_DECIMAL_FOR_EACH_PRECISION`: Use MIN_DECIMAL_FOR_EACH_PRECISION (note indexes are different)
  --> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:29:57
   |
29 |     DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION,
   |                                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: use of deprecated constant `arrow::datatypes::MIN_DECIMAL_FOR_EACH_PRECISION`: Use MIN_DECIMAL_FOR_EACH_PRECISION (note indexes are different)
   --> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:372:13
    |
372 |             MIN_DECIMAL_FOR_EACH_PRECISION[*precision as usize - 1],
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: use of deprecated constant `arrow::datatypes::MAX_DECIMAL_FOR_EACH_PRECISION`: Use MAX_DECIMAL128_FOR_EACH_PRECISION (note indexes are different)
   --> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:373:13
    |
373 |             MAX_DECIMAL_FOR_EACH_PRECISION[*precision as usize - 1],
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The clippy complain not related to this PR. I will be fixed after:

#14414

@zhuqi-lucas
Copy link
Contributor Author

cc @alamb @adriangb
The PR is ready for review.

@adriangb
Copy link
Contributor

adriangb commented Feb 3, 2025

Tests look good to me. I have not touched much of the optimizer, so I'm not confident reviewing the change itself.

Your explanation is great, I think it is important to record for posterity what the source cause was.
One thing that still doesn't make sense to me is why this only happens with an ORDER BY?

This was referenced Feb 3, 2025
@ozankabak
Copy link
Contributor

Thanks for the patch. We may be able to do this without checking for specific operators (like CoalescePartitionsExec, but using the APIs in the execution plan. @mertak-synnada, can you please review?

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 3, 2025

Thank you @adriangb for review. This only happen when sort with limit, because:

The following logic:

 // If we have a non-limit operator with fetch capability, update global
    // state as necessary:
    if pushdown_plan.fetch().is_some() {
        if global_state.fetch.is_none() {
            global_state.satisfied = true;
        }
        (global_state.skip, global_state.fetch) = combine_limit(
            global_state.skip,
            global_state.fetch,
            0,
            pushdown_plan.fetch(),
        );
    }

When sort with limit, the following steps cause the bug:

  1. global_state.satisfied = true
  2. We will remove the global limit from the original logic without this PR.
  3. when we go to the decision for plan which can be push down(CoalescePartitionsExec can be push down after Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #11652), we will not add back the global limit before CoalescePartitionsExec because the global_state.satisfied already setting to true.
      if pushdown_plan.supports_limit_pushdown() {
        if !combines_input_partitions(&pushdown_plan) {
            // We have information in the global state and the plan pushes down,
            // continue:
            Ok((Transformed::no(pushdown_plan), global_state))
        } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
            // This plan is combining input partitions, so we need to add the
            // fetch info to plan if possible. If not, we must add a `LimitExec`
            // with the information from the global state.
            let mut new_plan = plan_with_fetch;
            // Execution plans can't (yet) handle skip, so if we have one,
            // we still need to add a global limit
            if global_state.skip > 0 {
                new_plan =
                    add_global_limit(new_plan, global_state.skip, global_state.fetch);
            }
            global_state.fetch = skip_and_fetch;
            global_state.skip = 0;
            global_state.satisfied = true;
            Ok((Transformed::yes(new_plan), global_state))
        } else if global_state.satisfied {
            // If the plan is already satisfied, do not add a limit:
            Ok((Transformed::no(pushdown_plan), global_state))
        } else {
            global_state.satisfied = true;
            Ok((
                Transformed::yes(add_limit(
                    pushdown_plan,
                    global_state.skip,
                    global_fetch,
                )),
                global_state,
            ))
        }
    }

@zhuqi-lucas
Copy link
Contributor Author

Thank you @ozankabak for review, yeah i believe add with_fetch API for CoalescePartitionsExec may also solve this issue, and i also add it to do in the PR comments.

@xudong963 xudong963 self-requested a review February 3, 2025 15:15
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, I agree with @ozankabak 's suggestion.
Maybe you can also file an issue.

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// todo we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 for review, change the comments, and added a follow-up issue:
#14446

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 3, 2025

Generally LGTM, I agree with @ozankabak 's suggestion. Maybe you can also file an issue.

Thank you @xudong963 for review, added the follow-up:
#14446

global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
Copy link

@mertak-synnada mertak-synnada Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree with checking via API suggestion, please also check with the combines_input_partitions() helper function so that SortPreservingMerge can be affected as well.

In the optimizer logic, we remove the Limit operators first, and then we add them to the lowest possible point at the plan, if the plan is "satisfied" we drop the limit information. So if the plan is combining input partitions, we're only adding a global limit if skip information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there. But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that it won't unnecessarily push more data

// Execution plans can't (yet) handle skip, so if we have one,
// we still need to add a global limit
if global_state.skip > 0 {
   new_plan =
       add_global_limit(new_plan, global_state.skip, global_state.fetch);
}

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mertak-synnada for review:

While I agree with checking via API suggestion, please also check with the combines_input_partitions() helper function so that SortPreservingMerge can be affected as well.

I agree, i checked the SortPreservingMergeExec already, it supported with_fetch() and fetch(), so it's not affected i think?

impl SortPreservingMergeExec {
    /// Create a new sort execution plan
    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
        let cache = Self::compute_properties(&input, expr.clone());
        Self {
            input,
            expr,
            metrics: ExecutionPlanMetricsSet::new(),
            fetch: None,
            cache,
            enable_round_robin_repartition: true,
        }
    }

    /// Sets the number of rows to fetch
    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
        self.fetch = fetch;
        self
    }

    /// Sets the selection strategy of tied winners of the loser tree algorithm
    ///
    /// If true (the default) equal output rows are placed in the merged stream
    /// in round robin fashion. This approach consumes input streams at more
    /// even rates when there are many rows with the same sort key.
    ///
    /// If false, equal output rows are always placed in the merged stream in
    /// the order of the inputs, resulting in potentially slower execution but a
    /// stable output order.
    pub fn with_round_robin_repartition(
        mut self,
        enable_round_robin_repartition: bool,
    ) -> Self {
        self.enable_round_robin_repartition = enable_round_robin_repartition;
        self
    }

    /// Input schema
    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
        &self.input
    }

    /// Sort expressions
    pub fn expr(&self) -> &LexOrdering {
        self.expr.as_ref()
    }

    /// Fetch
    pub fn fetch(&self) -> Option<usize> {
        self.fetch
    }

    /// Creates the cache object that stores the plan properties
    /// such as schema, equivalence properties, ordering, partitioning, etc.
    fn compute_properties(
        input: &Arc<dyn ExecutionPlan>,
        ordering: LexOrdering,
    ) -> PlanProperties {
        let mut eq_properties = input.equivalence_properties().clone();
        eq_properties.clear_per_partition_constants();
        eq_properties.add_new_orderings(vec![ordering]);
        PlanProperties::new(
            eq_properties,                        // Equivalence Properties
            Partitioning::UnknownPartitioning(1), // Output Partitioning
            input.pipeline_behavior(),            // Pipeline Behavior
            input.boundedness(),                  // Boundedness
        )
    }
}

But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that it won't unnecessarily push more data.

I totally agree this! So i created a follow-up #14446 to support limit in the CoalescePartitionsExec, SortPreservingMerge already supported this according above code.

So if the plan is combining input partitions, we're only adding a global limit if skip information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there.

This is a good point, we can create another issue to try to improve this!

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, i confirmed SortPreservingMerge works well with fetch:

# Check output plan, expect no "output_ordering" clause in the physical_plan -> ParquetExec:
query TT
explain with selection as (
    select *
    from test_table
    ORDER BY string_col, int_col limit 1
)
select 1 as foo
from selection
order by string_col
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.string_col ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.string_col
04)------SubqueryAlias: selection
05)--------Projection: test_table.string_col
06)----------Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, fetch=1
07)------------TableScan: test_table projection=[int_col, string_col]
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--ProjectionExec: expr=[1 as foo, string_col@0 as string_col]
03)----ProjectionExec: expr=[string_col@1 as string_col]
04)------SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], fetch=1
05)--------SortExec: TopK(fetch=1), expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col]

Copy link

@mertak-synnada mertak-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@berkaysynnada
Copy link
Contributor

Thank you @zhuqi-lucas, @mertak-synnada and @xudong963. This looks good to me now, and I'm merging it. I guess @mertak-synnada will open a follow-up PR removing the explicit casting by utilizing a state parameter.

@berkaysynnada berkaysynnada merged commit 0d9f845 into apache:main Feb 4, 2025
25 checks passed
@adriangb
Copy link
Contributor

adriangb commented Feb 4, 2025

Thank you all for the quick fix!

Dandandan pushed a commit to coralogix/arrow-datafusion that referenced this pull request Apr 14, 2025
* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments
avantgardnerio pushed a commit to coralogix/arrow-datafusion that referenced this pull request Sep 22, 2025
* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments
avantgardnerio pushed a commit to coralogix/arrow-datafusion that referenced this pull request Oct 1, 2025
* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments
avantgardnerio pushed a commit to coralogix/arrow-datafusion that referenced this pull request Oct 15, 2025
* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments
Dandandan added a commit to coralogix/arrow-datafusion that referenced this pull request Nov 17, 2025
* Get working build

* Add pool_size method to MemoryPool (#218) (#230)

* Add pool_size method to MemoryPool

* Fix

* Fmt

Co-authored-by: Daniël Heres <danielheres@gmail.com>

* Respect `IGNORE NULLS` flag in `ARRAY_AGG` (#260) (apache#15544) v48

* Hook for doing distributed `CollectLeft` joins (#269)

* Ignore writer shutdown error (#271)

* ignore writer shutdown error

* cargo check

* Fix  bug in `swap_hash_join` (#278)

* Try and fix swap_hash_join

* Only swap projections when join does not have projections

* just backport upstream fix

* remove println

* Support Duration in min/max agg functions (#283) (apache#15310) v47

* Support Duration in min/max agg functions

* Attempt to fix build

* Attempt to fix build - Fix chrono version

* Revert "Attempt to fix build - Fix chrono version"

This reverts commit fd76fe6.

* Revert "Attempt to fix build"

This reverts commit 9114b86.

---------

Co-authored-by: svranesevic <svranesevic@users.noreply.github.com>

* Fix panics in array_union (#287) (apache#15149) v48

* Drop rust-toolchain

* Fix panics in array_union

* Fix the chrono

* Backport `GroupsAccumulator` for Duration min/max agg (#288) (apache#15322) v47

* Fix array_sort for empty record batch (#290) (apache#15149) v48

* fix: rewrite fetch, skip of the Limit node in correct order (apache#14496) v46

* fix: rewrite fetch, skip of the Limit node in correct order

* style: fix clippy

* Support aliases in ConstEvaluator (apache#14734) (#281) v46

* Support aliases in ConstEvaluator (apache#14734)

Not sure why they are not supported. It seems that if we're not careful,
some transformations can introduce aliases nested inside other expressions.

* Format Cargo.toml

* Preserve the name of grouping sets in SimplifyExpressions (#282) (apache#14888) v46

Whenever we use `recompute_schema` or `with_exprs_and_inputs`,
this ensures that we obtain the same schema.

* Support Duration in min/max agg functions (#284) (apache#15310) v47

Co-authored-by: svranesevic <svranesevic@users.noreply.github.com>

* fix case_column_or_null with nullable when conditions (apache#13886) v45

* fix case_column_or_null with nullable when conditions

* improve sqllogictests for case_column_or_null

---------

Co-authored-by: zhangli20 <zhangli20@kuaishou.com>

* Fix casewhen (apache#14156) v45

* Cherry-pick topk limit pushdown fix (apache#14192) v45

* fix: FULL OUTER JOIN and LIMIT produces wrong results (apache#14338) v45

* fix: FULL OUTER JOIN and LIMIT produces wrong results

* Fix minor slt testing

* fix test

(cherry picked from commit ecc5694)

* Cherry-pick global limit fix (apache#14245) v45

* fix: Limits are not applied correctly (apache#14418) v46

* fix: Limits are not applied correctly

* Add easy fix

* Add fix

* Add slt testing

* Address comments

* Disable grouping set in CSE

* Fix spm + limit (apache#14569) v46

* prost 0.13 / fix parquet dep

* Delete unreliable checks

* Segfault in ByteGroupValueBuilder (#294) (apache#15968) v50

* test to demonstrate segfault in ByteGroupValueBuilder

* check for offset overflow

* clippy

(cherry picked from commit 5bdaeaf)

* Update arrow dependency to include rowid (#295)

* Update arrow version

* Feat: Add fetch to CoalescePartitionsExec (apache#14499) (#298) v46

* add fetch info to CoalescePartitionsExec

* use Statistics with_fetch API on CoalescePartitionsExec

* check limit_reached only if fetch is assigned

Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com>

* Fix `CoalescePartitionsExec` proto serialization (apache#15824) (#299) v48

* add fetch to CoalescePartitionsExecNode

* gen proto code

* Add test

* fix

* fix build

* Fix test build

* remove comments

Co-authored-by: 张林伟 <lewiszlw520@gmail.com>

* Add JoinContext with JoinLeftData to TaskContext in HashJoinExec (#300)

* Add JoinContext with JoinLeftData to TaskContext in HashJoinExec

* Expose random state as const

* re-export ahash::RandomState

* JoinContext default impl

* Add debug log when setting join left data

* Update arrow version for not preserving dict_id (#303)

* Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream (apache#13995) (#302) v45

* Refactor spill handling in GroupedHashAggregateStream to use partial aggregate schema

* Implement aggregate functions with spill handling in tests

* Add tests for aggregate functions with and without spill handling

* Move test related imports into mod test

* Rename spill pool test functions for clarity and consistency

* Refactor aggregate function imports to use fully qualified paths

* Remove outdated comments regarding input batch schema for spilling in GroupedHashAggregateStream

* Update aggregate test to use AVG instead of MAX

* assert spill count

* Refactor partial aggregate schema creation to use create_schema function

* Refactor partial aggregation schema creation and remove redundant function

* Remove unused import of Schema from arrow::datatypes in row_hash.rs

* move spill pool testing for aggregate functions to physical-plan/src/aggregates

* Use Arc::clone for schema references in aggregate functions

(cherry picked from commit 81b50c4)

Co-authored-by: kosiew <kosiew@gmail.com>

* Update tag

* Push limits past windows (#337) (apache#17347) v50

* Restore old method for DQE

* feat(optimizer): Enable filter pushdown on window functions (apache#14026) v45

* Avoid Aliased Window Expr Enter Unreachable Code (apache#14109) v45

(cherry picked from commit fda500a)

* Use `Expr::qualified_name()` and `Column::new()` to extract partition keys from window and aggregate operators (#355) (apache#17757) v51

* Update PR template to be relevant to our fork

* Make limit pushdown work for SortPreservingMergeExec (apache#17893) (#361)

* re-publicise functions DQE relies on

* Handle columns in with_new_exprs with a Join (apache#15055) (#384)

apache#15055

* handle columns in with_new_exprs with Join

* test doesn't return result

* take join from result

* clippy

* make test fallible

* accept any pair of expression for new_on in with_new_exprs for Join

* use with_capacity

Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com>

---------

Co-authored-by: Georgi Krastev <georgi.krastev@coralogix.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com>
Co-authored-by: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com>
Co-authored-by: Sava Vranešević <20240220+svranesevic@users.noreply.github.com>
Co-authored-by: svranesevic <svranesevic@users.noreply.github.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Zhang Li <richselian@gmail.com>
Co-authored-by: zhangli20 <zhangli20@kuaishou.com>
Co-authored-by: Aleksey Kirilishin <54231417+avkirilishin@users.noreply.github.com>
Co-authored-by: xudong.w <wxd963996380@gmail.com>
Co-authored-by: Qi Zhu <821684824@qq.com>
Co-authored-by: Martins Purins <martins.purins@coralogix.com>
Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com>
Co-authored-by: 张林伟 <lewiszlw520@gmail.com>
Co-authored-by: kosiew <kosiew@gmail.com>
Co-authored-by: nuno-faria <nunofpfaria@gmail.com>
Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
Co-authored-by: Mason Hall <mason.hall@coralogix.com>
Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Limits are not applied correctly

6 participants