Skip to content

Moving cost based optimizations to physical planning #962

@rdettai

Description

@rdettai

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently some cost based optimizations are executed during the logical plan optimization. This breaks the intent of separating the logical and physical plan. Usually, the logical plan is supposed to be optimized using rule based optimizations only. It is in the physical plan selection that the cost based optimization should kick in.

A more concrete reason to move all cost based optimizations further down the pipe is to avoid the need to fetch statistics when building the logical plan. In Ballista for example the logical plan is built in the client, which might be off cluster, and thus shouldn’t be required to have access to the datastore (or at least not the high performance access that is sometimes required to fetch statistics).

Describe the solution you'd like

  • Remove statistics() and has_exact_statistics() from the TableProvider trait
  • Add a statistics() method to the ExecutionPlan trait
    • Each node of the physical plan will try to assess its output statistics itself according to its inputs and its internal logic. This is easier to maintain than the current approach where we are trying to reconstruct the statistics of the nodes externally (e.g. in hash_build_probe_order.rs -> get_num_rows(&LogicalPlan) ).
    • The returned type will be the same as the current Statistics struct, except that we use a boolean field is_exact instead of having another has_exact_statistics() methods
    • The method is sync and takes immutable &self. Some ExecutionPlan nodes might want to fetch the latest statistics when being queried. A good example is the ShuffleReaderExec which might want to recompute the statistics once all its input partitions are resolved. We consider that the execution plan nodes are mostly immutable, and and updating the statistics requires to rewrite the whole physical plan tree above the updated node.
    • [can be postponed to a separate PR] Make the TableProvider.scan(...) method async to allow the computation of the statistics when creating the source ExecutionPlan nodes. This will require async propagation to the PhysicalPlanner trait.
    • The AggregateStatistics rule (use the statistics directly to provide the aggregation if possible) could be managed:
      • By the HashAggregateExec itself when being constructed
      • During the physical plan optimization
    • The HashBuildProbeOrder rule (optimize join orderings according to table sizes) :
      • By the HashJoinExec and CrossJoinExec themselves when being constructed
      • During the physical plan optimization, this would give better support for Spark style AQE
  • [can be postponed to a separate PR] add a eval_stats(&self, &Statistics) to the PhysicalExpr trait so that we can also propagate statistics through complex expressions

Describe alternatives you've considered
It was considered to have async fn statistics(&mut self) to support fetching the latest statistics or simply fn statistics(&self). Fetching the latest statistics is important to support Spark style AQE, but it can also be achieved by keeping the physical plan nodes immutable and updating the entire plan tree when necessary.

It would also be valuable to have the statistics at the partition level. This should be considered in a further evolution.

Additional context
The need to move the CBO to the execution plan is linked to problem such as the implementation of table formats

It is also related to the multiple reading of the statistics raised in Ballista:

Features for which the implementation will likely create merge conflicts:

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions