Skip to content

Commit 561be4f

Browse files
authored
[CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics, option for SortMergeJoin (#4219)
* [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the available statistics * Fix HashJoin CollectLeft bug, refine UT to cover 'enable'/'disable' repartition_joins * add comments * ignore 0 stats * Resolve review comments, add intg UT for SMJ * fix conflicts * tiny fix to doc * refine swap_join_filter() * update configs.md * fix configs.md
1 parent 22fdbcf commit 561be4f

File tree

17 files changed

+2100
-1225
lines changed

17 files changed

+2100
-1225
lines changed

datafusion/core/src/config.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
7474
/// Type of `TableProvider` to use when loading `default` schema
7575
pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";
7676

77+
/// Configuration option "datafusion.optimizer.top_down_join_key_reordering"
78+
pub const OPT_TOP_DOWN_JOIN_KEY_REORDERING: &str =
79+
"datafusion.optimizer.top_down_join_key_reordering";
80+
81+
/// Configuration option "datafusion.optimizer.prefer_hash_join"
82+
pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
83+
84+
/// Configuration option "atafusion.optimizer.hash_join_single_partition_threshold"
85+
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
86+
"datafusion.optimizer.hash_join_single_partition_threshold";
87+
7788
/// Definition of a configuration option
7889
pub struct ConfigDefinition {
7990
/// key used to identifier this configuration option
@@ -266,6 +277,22 @@ impl BuiltInConfigs {
266277
"Type of `TableProvider` to use when loading `default` schema. Defaults to None",
267278
None,
268279
),
280+
ConfigDefinition::new_bool(
281+
OPT_TOP_DOWN_JOIN_KEY_REORDERING,
282+
"When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true",
283+
true,
284+
),
285+
ConfigDefinition::new_bool(
286+
OPT_PREFER_HASH_JOIN,
287+
"When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently\
288+
than SortMergeJoin but consumes more memory. Defaults to true",
289+
true,
290+
),
291+
ConfigDefinition::new_u64(
292+
OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD,
293+
"The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
294+
1024 * 1024,
295+
),
269296
]
270297
}
271298
}

datafusion/core/src/execution/context.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use crate::{
2626
logical_expr::{PlanType, ToStringifiedPlan},
2727
optimizer::optimizer::Optimizer,
2828
physical_optimizer::{
29-
aggregate_statistics::AggregateStatistics,
30-
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
29+
aggregate_statistics::AggregateStatistics, join_selection::JoinSelection,
30+
optimizer::PhysicalOptimizerRule,
3131
},
3232
};
3333
pub use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1166,8 +1166,6 @@ pub struct SessionConfig {
11661166
pub parquet_pruning: bool,
11671167
/// Should DataFusion collect statistics after listing files
11681168
pub collect_statistics: bool,
1169-
/// Should DataFusion optimizer run a top down process to reorder the join keys
1170-
pub top_down_join_key_reordering: bool,
11711169
/// Configuration options
11721170
pub config_options: Arc<RwLock<ConfigOptions>>,
11731171
/// Opaque extensions.
@@ -1187,7 +1185,6 @@ impl Default for SessionConfig {
11871185
repartition_windows: true,
11881186
parquet_pruning: true,
11891187
collect_statistics: false,
1190-
top_down_join_key_reordering: true,
11911188
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
11921189
// Assume no extensions by default.
11931190
extensions: HashMap::with_capacity_and_hasher(
@@ -1508,7 +1505,7 @@ impl SessionState {
15081505

15091506
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
15101507
Arc::new(AggregateStatistics::new()),
1511-
Arc::new(HashBuildProbeOrder::new()),
1508+
Arc::new(JoinSelection::new()),
15121509
];
15131510
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
15141511
if config

datafusion/core/src/physical_optimizer/enforcement.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
1919
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
2020
//!
21+
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
2122
use crate::error::Result;
2223
use crate::physical_optimizer::PhysicalOptimizerRule;
2324
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
@@ -72,7 +73,11 @@ impl PhysicalOptimizerRule for BasicEnforcement {
7273
config: &SessionConfig,
7374
) -> Result<Arc<dyn ExecutionPlan>> {
7475
let target_partitions = config.target_partitions;
75-
let top_down_join_key_reordering = config.top_down_join_key_reordering;
76+
let top_down_join_key_reordering = config
77+
.config_options()
78+
.read()
79+
.get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
80+
.unwrap_or_default();
7681
let new_plan = if top_down_join_key_reordering {
7782
// Run a top-down process to adjust input key ordering recursively
7883
let plan_requirements = PlanWithKeyRequirements::new(plan);
@@ -209,6 +214,12 @@ fn adjust_input_keys_ordering(
209214
request_key_ordering: vec![None, new_right_request],
210215
}))
211216
}
217+
PartitionMode::Auto => {
218+
// Can not satisfy, clear the current requirements and generate new empty requirements
219+
Ok(Some(PlanWithKeyRequirements::new(
220+
requirements.plan.clone(),
221+
)))
222+
}
212223
}
213224
} else if let Some(CrossJoinExec { left, .. }) =
214225
plan_any.downcast_ref::<CrossJoinExec>()

0 commit comments

Comments
 (0)