Skip to content

TopK dynamic filter pushdown attempt 2 #15770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

No description provided.

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate common Related to common crate datasource Changes to the datasource crate labels Apr 18, 2025
@@ -382,7 +383,7 @@ impl PhysicalOptimizerRule for PushdownFilter {

context
.transform_up(|node| {
if node.plan.as_any().downcast_ref::<FilterExec>().is_some() {
if node.plan.as_any().downcast_ref::<FilterExec>().is_some() || node.plan.as_any().downcast_ref::<SortExec>().is_some() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berkaysynnada I didn't notice this in the original PR. This seems problematic. IMO doing downcast matching here is a smell that the API needs changing. It limits implementations to a hardcoded list of plans, which defeats the purpose of making DataFusion pluggable / having a dyn ExecutionPlan. The original implementation didn't require this. I think this goes hand-in hand with the revisit parameter. It seems that you were able to get from 3 methods down to 2 by replacing one of them with this downcast matching and the other with the extra recursion via the revisit parameter. It would be great to iterate on this and find a way to avoid the downcast matching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. We can run this pushdown logic on every operator actually, but then it will work in worst-time complexity always. I've shared the solution of removing revisit parameter, and let me open an issue for that. I strongly believe it will be taken and implemented in short time by some people.

To remove these downcasts, I think we can either introduce a new method to the API just returning a boolean saying that "this operator might introduce a filter or not", or try to understand that by the existing API's, maybe with some refactor. Do you have an idea for the latter?

Copy link
Contributor Author

@adriangb adriangb Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose an API something like this:

trait ExecutionPlan {
    fn gather_filters_for_pushdown(
        &self,
        parent_filters: &[Arc<dyn ExecutionPlan>],
    ) -> Result<FilterPushdownPlan> {
        let unsupported = vec![FilterPushdownSupport::Unsupported; parent_filters.len()];
        Ok(
            FilterPushdownPlan {
                parent_filters_for_children: vec![unsupported; self.children().len()],
                self_filters_for_children: vec![vec![]; self.children().len()],
            },
        )
    }

    fn propagate_filter_pushdown(
        &self,
        parent_pushdown_result: Vec<FilterPushdowChildResult>,
        _self_filter_pushdown_result: Vec<FilterPushdowChildResult>,
    ) -> Result<FilterPushdownPropagation> {
        Ok(
            FilterPushdownPropagation {
                parent_filter_result: parent_pushdown_result,
                new_node: None,
            },
        )
    }
}

pub struct FilterPushdownPropagation {
    parent_filter_result: Vec<FilterPushdowChildResult>,
    new_node: Option<Arc<dyn ExecutionPlan>>,
}


#[derive(Debug, Clone, Copy)]
pub enum FilterPushdowChildResult {
    Supported,
    Unsupported,
}

impl FilterPushdowChildResult {

}

#[derive(Debug, Clone)]
pub enum FilterPushdownSupport {
    Supported(Arc<dyn PhysicalExpr>),
    Unsupported,
}

#[derive(Debug, Clone)]
pub struct FilterPushdownPlan {
    parent_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
    self_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
}

The optimizer rule will have to do a bit of bookeeping and slicing correctly but this should avoid the need for any downcast matching or retry and minimize clones of plans. And it should do one walk down and up regardless of what ends up happening with the filters.

Copy link
Contributor Author

@adriangb adriangb Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs fixing of some failing tests, cleanup of the plethora of helper methods I added and a lot of docs but here's the idea: #15801. The points are:

  • No downcast matching / hardcoding of implementations
  • Only recurses once / no retrying
  • Does no cloning / copying for branches that have no changes
  • Doesn't insert new operators

@adriangb
Copy link
Contributor Author

Pausing this until #15769 is done

@adriangb
Copy link
Contributor Author

Pausing this until #15769 is done

I was able to unblock by wiring up to TestDataSource

@github-actions github-actions bot removed the datasource Changes to the datasource crate label Apr 19, 2025
Comment on lines 1224 to 1227
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
.with_fetch(self.fetch)
.with_preserve_partitioning(self.preserve_partitioning);
new_sort.filter = Arc::clone(&self.filter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this for a while and spent an hour trying to figure out why my test was failing. IMO we should have a test that enforces the invariant that ExecutionPlan::with_new_children(Arc::clone(&node), node.children()) == node

@@ -22,7 +22,7 @@ mod binary;
mod case;
mod cast;
mod column;
mod dynamic_filters;
pub mod dynamic_filters;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit has me tripped up. I'm not sure where the right place to put dynamic_filters is such that it's public for our internal use in operators but private from the outside world 🤔

@adriangb
Copy link
Contributor Author

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing
let filter = Arc::clone(&self.filter) as Arc<dyn PhysicalExpr>;
let batch = filter_and_project(&batch, &filter, None, batch.schema_ref())?;
if batch.num_rows() == 0 {
    return Ok(());
}

(filter_and_project is from FilterExec, we just need to make it pub(crate))

@Dandandan
Copy link
Contributor

Dandandan commented Apr 19, 2025

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing

@Dandandan I believe with this setup we should be able to achieve with a couple LOC in insert_batch:

// Apply the filter to the batch before processing
let filter = Arc::clone(&self.filter) as Arc<dyn PhysicalExpr>;
let batch = filter_and_project(&batch, &filter, None, batch.schema_ref())?;
if batch.num_rows() == 0 {
    return Ok(());
}

(filter_and_project is from FilterExec, we just need to make it pub(crate))

I think we probably want to avoid filtering the entire batch, but indeed, if the filter expression is available it will be only a couple LOC!

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Apr 19, 2025
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Apr 19, 2025
"
);

// Actually apply the optimization to the plan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognize these diverge a bit from other tests, happy to move them somewhere better....

@adriangb adriangb marked this pull request as ready for review April 20, 2025 00:48
@adriangb
Copy link
Contributor Author

Marking as ready for review despite not having any numbers to substantiate performance improvement (because we need #15769) given that algorithmically and from experience in the previous PR we know this is a big win it might be okay to merge without interlocking PRs.

@berkaysynnada
Copy link
Contributor

@adriangb I'll complete reviewing this after merging other open PR's.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants