Skip to content

Commit 3c31339

Browse files
jackwenergandronchik
authored andcommitted
refactor: add optimzer struct (apache#2616)
1 parent 137a9f3 commit 3c31339

File tree

4 files changed

+62
-207
lines changed

4 files changed

+62
-207
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,15 @@ use crate::{
3636
plan::{CreateCatalogSchema, CreateExternalTable},
3737
PlanType, ToStringifiedPlan,
3838
},
39-
optimizer::eliminate_filter::EliminateFilter,
40-
optimizer::eliminate_limit::EliminateLimit,
39+
optimizer::{
40+
eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
41+
optimizer::Optimizer,
42+
},
4143
physical_optimizer::{
4244
aggregate_statistics::AggregateStatistics,
4345
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
4446
},
4547
};
46-
use log::{debug, trace};
4748
use parking_lot::RwLock;
4849
use std::collections::{HashMap, HashSet};
4950
use std::string::String;
@@ -71,7 +72,6 @@ use crate::optimizer::optimizer::OptimizerRule;
7172
use crate::optimizer::projection_push_down::ProjectionPushDown;
7273
use crate::optimizer::simplify_expressions::SimplifyExpressions;
7374
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
74-
use crate::optimizer::to_approx_perc::ToApproxPerc;
7575

7676
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
7777
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
@@ -1084,7 +1084,7 @@ pub struct SessionState {
10841084
/// Uuid for the session
10851085
pub session_id: String,
10861086
/// Responsible for optimizing a logical plan
1087-
pub optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
1087+
pub optimizer: Optimizer,
10881088
/// Responsible for optimizing a physical execution plan
10891089
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
10901090
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
@@ -1143,7 +1143,7 @@ impl SessionState {
11431143

11441144
SessionState {
11451145
session_id,
1146-
optimizers: vec![
1146+
optimizer: Optimizer::new(vec![
11471147
// Simplify expressions first to maximize the chance
11481148
// of applying other optimizations
11491149
Arc::new(SimplifyExpressions::new()),
@@ -1154,11 +1154,7 @@ impl SessionState {
11541154
Arc::new(FilterPushDown::new()),
11551155
Arc::new(LimitPushDown::new()),
11561156
Arc::new(SingleDistinctToGroupBy::new()),
1157-
// ToApproxPerc must be applied last because
1158-
// it rewrites only the function and may interfere with
1159-
// other rules
1160-
Arc::new(ToApproxPerc::new()),
1161-
],
1157+
]),
11621158
physical_optimizers: vec![
11631159
Arc::new(AggregateStatistics::new()),
11641160
Arc::new(HashBuildProbeOrder::new()),
@@ -1227,9 +1223,9 @@ impl SessionState {
12271223
/// Replace the optimizer rules
12281224
pub fn with_optimizer_rules(
12291225
mut self,
1230-
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
1226+
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
12311227
) -> Self {
1232-
self.optimizers = optimizers;
1228+
self.optimizer = Optimizer::new(rules);
12331229
self
12341230
}
12351231

@@ -1247,7 +1243,7 @@ impl SessionState {
12471243
mut self,
12481244
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
12491245
) -> Self {
1250-
self.optimizers.push(optimizer_rule);
1246+
self.optimizer.rules.push(optimizer_rule);
12511247
self
12521248
}
12531249

@@ -1262,16 +1258,21 @@ impl SessionState {
12621258

12631259
/// Optimizes the logical plan by applying optimizer rules.
12641260
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
1261+
let execution_props = &mut self.execution_props.clone();
1262+
12651263
if let LogicalPlan::Explain(e) = plan {
12661264
let mut stringified_plans = e.stringified_plans.clone();
12671265

12681266
// optimize the child plan, capturing the output of each optimizer
1269-
let plan =
1270-
self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
1267+
let plan = self.optimizer.optimize(
1268+
e.plan.as_ref(),
1269+
execution_props,
1270+
|optimized_plan, optimizer| {
12711271
let optimizer_name = optimizer.name().to_string();
12721272
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
12731273
stringified_plans.push(optimized_plan.to_stringified(plan_type));
1274-
})?;
1274+
},
1275+
)?;
12751276

12761277
Ok(LogicalPlan::Explain(Explain {
12771278
verbose: e.verbose,
@@ -1280,35 +1281,8 @@ impl SessionState {
12801281
schema: e.schema.clone(),
12811282
}))
12821283
} else {
1283-
self.optimize_internal(plan, |_, _| {})
1284-
}
1285-
}
1286-
1287-
/// Optimizes the logical plan by applying optimizer rules, and
1288-
/// invoking observer function after each call
1289-
fn optimize_internal<F>(
1290-
&self,
1291-
plan: &LogicalPlan,
1292-
mut observer: F,
1293-
) -> Result<LogicalPlan>
1294-
where
1295-
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
1296-
{
1297-
let execution_props = &mut self.execution_props.clone();
1298-
let optimizers = &self.optimizers;
1299-
1300-
let execution_props = execution_props.start_execution();
1301-
1302-
let mut new_plan = plan.clone();
1303-
debug!("Input logical plan:\n{}\n", plan.display_indent());
1304-
trace!("Full input logical plan:\n{:?}", plan);
1305-
for optimizer in optimizers {
1306-
new_plan = optimizer.optimize(&new_plan, execution_props)?;
1307-
observer(&new_plan, optimizer.as_ref());
1284+
self.optimizer.optimize(plan, execution_props, |_, _| {})
13081285
}
1309-
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
1310-
trace!("Full Optimized logical plan:\n {:?}", plan);
1311-
Ok(new_plan)
13121286
}
13131287

13141288
/// Creates a physical plan from a logical plan.

datafusion/core/src/optimizer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,4 @@ pub mod optimizer;
2828
pub mod projection_push_down;
2929
pub mod simplify_expressions;
3030
pub mod single_distinct_to_groupby;
31-
pub mod to_approx_perc;
3231
pub mod utils;

datafusion/core/src/optimizer/optimizer.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
//! Query optimizer traits
1919
20+
use std::sync::Arc;
21+
22+
use log::{debug, trace};
23+
2024
use crate::error::Result;
2125
use crate::execution::context::ExecutionProps;
2226
use crate::logical_plan::LogicalPlan;
@@ -35,3 +39,42 @@ pub trait OptimizerRule {
3539
/// A human readable name for this optimizer rule
3640
fn name(&self) -> &str;
3741
}
42+
43+
/// A rule-based optimizer.
44+
#[derive(Clone)]
45+
pub struct Optimizer {
46+
/// All rules to apply
47+
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
48+
}
49+
50+
impl Optimizer {
51+
/// Create a new optimizer with the given rules
52+
pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
53+
Self { rules }
54+
}
55+
56+
/// Optimizes the logical plan by applying optimizer rules, and
57+
/// invoking observer function after each call
58+
pub fn optimize<F>(
59+
&self,
60+
plan: &LogicalPlan,
61+
execution_props: &mut ExecutionProps,
62+
mut observer: F,
63+
) -> Result<LogicalPlan>
64+
where
65+
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
66+
{
67+
let execution_props = execution_props.start_execution();
68+
69+
let mut new_plan = plan.clone();
70+
debug!("Input logical plan:\n{}\n", plan.display_indent());
71+
trace!("Full input logical plan:\n{:?}", plan);
72+
for rule in &self.rules {
73+
new_plan = rule.optimize(&new_plan, execution_props)?;
74+
observer(&new_plan, rule.as_ref());
75+
}
76+
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
77+
trace!("Full Optimized logical plan:\n {:?}", plan);
78+
Ok(new_plan)
79+
}
80+
}

datafusion/core/src/optimizer/to_approx_perc.rs

Lines changed: 0 additions & 161 deletions
This file was deleted.

0 commit comments

Comments
 (0)