Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Stop copying LogicalPlan during OptimizerPasses #9637

Closed
29 of 31 tasks
alamb opened this issue Mar 16, 2024 · 47 comments · Fixed by #10835
Closed
29 of 31 tasks

[EPIC] Stop copying LogicalPlan during OptimizerPasses #9637

alamb opened this issue Mar 16, 2024 · 47 comments · Fixed by #10835
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 16, 2024

Is your feature request related to a problem or challenge?

Broken out from #9577 where @mustafasrepo @comphead and @jayzhan211 and I were discussing optimizer performance

TLDR is that the datafusion optimizer is slow. When I did some profiling locally by running the following

cargo bench --bench sql_planner -- physical_plan_tpch_all

My analysis is that almost 40% of the planning time is spent in SimplifyExprs and CommonSubexprEliminate and most of that time is related to copying expressions from what I can tell

Screenshot 2024-03-14 at 11 07 57 AM

While those passes themselves internally make a bunch of clones, which we are improving (e.g. @jayzhan211 on #9628) I think there is a more fundamental structural problem

I think a core challenge is that the OptimizerRule trait pretty much requires copying Exprs on each pass, as it gets a &LogicalPlan input, but produces a LogicalPlan output

    // Required methods
    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Option<LogicalPlan>, DataFusionError>;

This mean any pass that works on Exprs must clone all Exprs (by calling LogicalPlan::expressions()) rewrite them, and then then create a new LogicalPlan with those new Exprs.

Here is that pattern in the expression simplifier:

https://github.com/apache/arrow-datafusion/blob/0eec5f8e1d0f55e48f5cdc628fbb5ddd89b91512/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs#L112-L123

Describe the solution you'd like

Find some way to avoid clone'ing exprs during LogicalPlan rewrite

Update: here are the tasks:

Infrastructure Preparation

Update OptimizerRules to avoid copying

Update AnalyzerRules to avoid copying

Update Other to avoid copying

Describe alternatives you've considered

No response

Additional context

We have talked about various other ways to reduce copying of LogicalPlans as well as its challenges in other tickets:

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 16, 2024

I have two idea before deep diving to the code

  1. rewrite to have owned LogicalPlan then we can have owned expression easily. <- lots of code change
  2. lazy clone on rewrite, the spirit similar to copy on write. The idea from simplify() Add a ScalarUDFImpl::simplfy() API, move SimplifyInfo et al to datafusion_expr #9304 may help here too or we can rely on Transformed<Expr>
enum SimpliedExpr {
  Simplified(Expr)
  Original
}

fn simply_expr(&self, expr: &Expr) -> SimpliedExpr

if expr is Original, we can avoid clone.

@alamb
Copy link
Contributor Author

alamb commented Mar 16, 2024

I have two idea before deep diving to the code

I think 2 sounds interesting

Another think I was thinking was something like LogicalPlan::rewrite(mut self, rewriter)

I think that along with Arc::try_unwrap could be used to minimize the places where cloning was actually needed

Maybe we can prototype with

impl OptimizerRule {
...

  /// does this rule support rewriting owned plans?
  fn supports_owned(&self) -> bool { return false }

  /// if supports_owned returns true, calls try_optimize_owned
  fn try_optimize_owned(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...

And then play around with the code that calls try_optimize here https://github.com/apache/arrow-datafusion/blob/f4107d47bb4c0260d301294ddfc7c67d96574636/datafusion/optimizer/src/optimizer.rs#L360-L368 to try and use the try_optimize_owned API (without having to rewrite all the optimizers) for SimplifyExprs

If we could show a significant performance improvement for the sql_planner benchmarks then I think it would be worth spending time reworking the other APIs

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 17, 2024

@alamb I think being able to skip failed rules is the main challenge to rewriting the plan with ownership

https://github.com/apache/arrow-datafusion/blob/37253e57beb25f0f1a4412b75421a489c2cb3c6a/datafusion/optimizer/src/optimizer.rs#L325

We need to restore the original plan if the rewrite fails at some point, however, the old plan is consumed and we lose ownership. Giving up ownership too early is not a good idea if we need to restore the original plan.

Instead of Transformed, I think we need to preserve the old data and fail state to mimic Err in Result but with old data

#[derive(Debug, PartialEq, Eq)]
pub enum OptimizedState {
    Yes,
    No,
    Fail,
}

#[derive(Debug)]
pub struct Optimized<T, E = DataFusionError> {
    pub optimzied_data: Option<T>,
    // Used to store the original data if optimized successfully
    pub original_data: T,
    pub optimized_state: OptimizedState,
    // Used to store the error if optimized failed, so we can early return but preserve the original data
    pub error: Option<E>,
}

impl<T, E> Optimized<T, E> {
    pub fn yes(optimzied_data: T, original_data: T) -> Self {
        Self {
            optimzied_data: Some(optimzied_data),
            original_data,
            optimized_state: OptimizedState::Yes,
            error: None,
        }
    }

    pub fn no(original_data: T) -> Self {
        Self {
            optimzied_data: None,
            original_data,
            optimized_state: OptimizedState::No,
            error: None,
        }
    }

    pub fn fail(original_data: T, e: E) -> Self {
        Self {
            optimzied_data: None,
            original_data,
            optimized_state: OptimizedState::Fail,
            error: Some(e),
        }
    }
}

Then, for every optimization, we return Optimized<LogicalPlan> or Optimzied<Expr>

@alamb
Copy link
Contributor Author

alamb commented Mar 17, 2024

@alamb I think being able to skip failed rules is the main challenge to rewriting the plan with ownership

This is an excellent point

Instead of Transformed, I think we need to preserve the old data and fail state to mimic Err in Result but with old data

If we need to preserve the old data, there is no choice but to copy the LogicalPlan on each pass 🤔 (well I guess there could be some way to repair the partially rewritten plan, but that sounds very compliated).

Since skip_failed_rules is false by default:

https://github.com/apache/arrow-datafusion/blob/dcfe70987e98da0410146b5e1292ab20f3f118e0/datafusion/common/src/config.rs#L549

Could we maybe only do the copy when skip_failed_rules is enabled? Something like this:

if skip_failed_rules {
  let original_plan = plan.clone();
  let new_plan = optimizer.try_optimize_owned(plan)
   .ok_or_else(|e| original_plan);
} else {
  optimizer.try_optimize_owned(plan)?
}

@jayzhan211
Copy link
Contributor

I think it is possible to preserve unnecessary clones even if we preserve the old plan, and only clone the data for the new expr and new plan, but I agree it is a little complicated, it needs the enum I show above. We can optimize for the default path first, and others later on.

@alamb
Copy link
Contributor Author

alamb commented Mar 17, 2024

Let's give it a try!

@jayzhan211
Copy link
Contributor

note: I profile my change and find out the time does not improve at all, I find that the cloned inside expr_list_to_fields may be the core issue.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 20, 2024

I analyze the sql_planner again and find that exprlist_to_fields and calc_func_dependencies_for_project are the two most time spending func.

so I think #9595 might be an important step for the optimization here.

https://github.com/apache/arrow-datafusion/blob/ad8d552b9f150c3c066b0764e84f72b667a649ff/datafusion/expr/src/logical_plan/plan.rs#L1804-L1813

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2024

I played around with some ideas last night and it is looking promising (though not yet done). I put my draft here #9708. I hope to try and work on it more over the next few days, but I am pretty busy preparing for meetups / presentations / papers for next week. It may be a few more days

@jayzhan211
Copy link
Contributor

I would be surprised if #9708 improves, let me see if I can improve #9658 based on it.

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2024

BTW here is a video of how I profile DataFusion: #9577 (comment)

@universalmind303
Copy link
Contributor

universalmind303 commented Mar 20, 2024

I'm curious, It seems like most of the reasoning behind Arc<LogicalPlan> and clone are due to the optimizer stage. Polars uses an intermediate representation ALogicalPlan (arena allocated logical plan) solely for the optimizer stage. I wonder if the same techinque could be applied here.

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2024

I think area allocating is pretty common in various compiler/query optimizer systems because you know the lifetime of a plan doesn't outlive the the optimizer pass (as in you can bound the intermediate results)

I think rust actually is ideally setup to avoid having to arena allocate (which still requires copying stuff to a new plan) (using ownership) -- we just need to regigger how DataFusion is setup to use this. I think we are close

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 21, 2024

There is one issue that we are not able to call Arc::into_inner for skip-failed-rules path since we hold a clone for restoration.

@alamb
Copy link
Contributor Author

alamb commented Mar 21, 2024

There is one issue that we are not able to call Arc::into_inner for skip-failed-rules path since we hold a clone for restoration.

What I did in #9708 which seems to have worked pretty well is to copy the LogicalPlan only if skip_failed_rules is set

Like this

                let prev_plan = if options.optimizer.skip_failed_rules {
                    Some(new_plan.clone())
                } else {
                    None
                };

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 21, 2024 via email

@alamb
Copy link
Contributor Author

alamb commented Mar 21, 2024

But I played around without mut before, maybe there are ways to solve this

Yes, this is indeed the case. The code in #9708 is pretty terrible / hacky to deal with the Arcd inputs, but it does seem to work:

@alamb
Copy link
Contributor Author

alamb commented Mar 21, 2024

With some somewhat hacky code in #9708 (comment) I saw a 25% performance improvement.

Given the results I saw in #9708 here is my proposal:

  1. Update the Optimizer implementation / Optimizer APIs to rewrite existing LogicalPlans rather than make new ones
  2. Add the necessary APIs to LogicalPlan
  3. Contemplate eventually switching to use Box<LogicalPlan> instead of Arc<LogicalPlan> to store children in LogicalPlan....

I am not sure which of these APIs would be better (rewrite in place via &mut plan or take ownership (via plan)). I feel like the first one would likely be faster (as it avoids copying at all), but the second is easier to reason about 🤔

impl OptimizerRule {
...
  /// does this rule support rewriting plans rather than copying them?
  fn supports_mut (&self) -> bool { return false }

  /// if supports_mut, returns true, rewrites `plan` in place 
  fn optimize_mut(
        &self,
        plan: &mut LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<()>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...
impl OptimizerRule {
...

  /// does this rule support rewriting owned plans?
  fn supports_owned(&self) -> bool { return false }

  /// if supports_owned returns true, rewrites the LogicalPlan in returning a newly rewritten plan
  fn try_optimize_owned(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...

@jayzhan211
Copy link
Contributor

jayzhan211 commented Mar 24, 2024

Upd: I realize we need to do the std::mem::take for Box too. 😢
Should we change to box first, so we can easily modify plan without hack?

rewrite in place via &mut plan or take ownership (via plan))

I think we can take ownership by default, and mutable inplace if better? For example, change to muable for rewriting new expressions and new plans.

@alamb
Copy link
Contributor Author

alamb commented Mar 24, 2024

Should we change to box first, so we can easily modify plan without hack?

I was exploring keeping the Arc initally for two reasons:

  1. It avoids breaking changes in downstream crates
  2. Until we fix the optimzier so it doens't do so much copying, switching to Box will only make the problem worse (as now all clones will do deep clones of all children.

SO I was thinking we could start with the hacky solution that kept Arc<LogicalPlan> and then we could potentially switch to Box<LogicalPlan> afterwards.

@alamb
Copy link
Contributor Author

alamb commented Apr 29, 2024

I filed tickets for the other optimizer passes and linked them to the description of this ticket

@alamb
Copy link
Contributor Author

alamb commented May 17, 2024

Update here is that this issue is done other other than

  1. Stop copying LogicalPlan and Exprs in SingleDistinctToGroupBy #10295 (which @appletreeisyellow is working on )
  2. Stop copying Exprs and LogicalPlans so much during Common Subexpression Elimination #9873 which I will do (but I am currently waiting for @peter-toth 's work in Rewrite CommonSubexprEliminate to avoid copies using TreeNode #10067 to complete)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment