Skip to content

Commit fe5dfec

Browse files
committed
Push down InList or hash table references from HashJoinExec depending on the size of the build side
1 parent 486c5d8 commit fe5dfec

File tree

13 files changed

+1206
-240
lines changed

13 files changed

+1206
-240
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,22 @@ config_namespace! {
10191019
/// will be collected into a single partition
10201020
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
10211021

1022+
/// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1023+
/// Build sides larger than this will use hash table lookups instead.
1024+
/// Set to 0 to always use hash table lookups.
1025+
///
1026+
/// InList pushdown can be more efficient for small build sides because it can result in better
1027+
/// statistics pruning as well as use any bloom filters present on the scan side.
1028+
/// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1029+
/// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1030+
///
1031+
/// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1032+
///
1033+
/// The default is 128kB per partition.
1034+
/// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1035+
/// but avoids excessive memory usage or overhead for larger joins.
1036+
pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1037+
10221038
/// The default filter selectivity used by Filter Statistics
10231039
/// when an exact selectivity cannot be determined. Valid values are
10241040
/// between 0 (no selectivity) and 100 (all rows are selected).

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 191 additions & 22 deletions
Large diffs are not rendered by default.

datafusion/physical-expr/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ datafusion-expr = { workspace = true }
4848
datafusion-expr-common = { workspace = true }
4949
datafusion-functions-aggregate-common = { workspace = true }
5050
datafusion-physical-expr-common = { workspace = true }
51-
half = { workspace = true }
5251
hashbrown = { workspace = true }
5352
indexmap = { workspace = true }
5453
itertools = { workspace = true, features = ["use_std"] }

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,14 @@ impl InListExpr {
321321
&self.list
322322
}
323323

324+
pub fn is_empty(&self) -> bool {
325+
self.list.is_empty()
326+
}
327+
328+
pub fn len(&self) -> usize {
329+
self.list.len()
330+
}
331+
324332
/// Is this negated e.g. NOT IN LIST
325333
pub fn negated(&self) -> bool {
326334
self.negated

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ datafusion-common = { workspace = true }
5656
datafusion-common-runtime = { workspace = true, default-features = true }
5757
datafusion-execution = { workspace = true }
5858
datafusion-expr = { workspace = true }
59+
datafusion-functions = { workspace = true }
5960
datafusion-functions-aggregate-common = { workspace = true }
6061
datafusion-functions-window-common = { workspace = true }
6162
datafusion-physical-expr = { workspace = true, default-features = true }

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use crate::filter_pushdown::{
2626
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2727
FilterPushdownPropagation,
2828
};
29-
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
29+
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
30+
use crate::joins::hash_join::shared_bounds::{
31+
ColumnBounds, PushdownStrategy, SharedBuildAccumulator,
32+
};
3033
use crate::joins::hash_join::stream::{
3134
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
3235
};
@@ -89,7 +92,8 @@ const HASH_JOIN_SEED: RandomState =
8992
/// HashTable and input data for the left (build side) of a join
9093
pub(super) struct JoinLeftData {
9194
/// The hash table with indices into `batch`
92-
pub(super) hash_map: Box<dyn JoinHashMapType>,
95+
/// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
96+
pub(super) hash_map: Arc<dyn JoinHashMapType>,
9397
/// The input rows for the build side
9498
batch: RecordBatch,
9599
/// The build side on expressions values
@@ -106,30 +110,12 @@ pub(super) struct JoinLeftData {
106110
_reservation: MemoryReservation,
107111
/// Bounds computed from the build side for dynamic filter pushdown
108112
pub(super) bounds: Option<Vec<ColumnBounds>>,
113+
/// InList values for small build sides (alternative to hash map pushdown)
114+
/// Contains unique values from build side for filter pushdown as InList expressions
115+
pub(super) membership: PushdownStrategy,
109116
}
110117

111118
impl JoinLeftData {
112-
/// Create a new `JoinLeftData` from its parts
113-
pub(super) fn new(
114-
hash_map: Box<dyn JoinHashMapType>,
115-
batch: RecordBatch,
116-
values: Vec<ArrayRef>,
117-
visited_indices_bitmap: SharedBitmapBuilder,
118-
probe_threads_counter: AtomicUsize,
119-
reservation: MemoryReservation,
120-
bounds: Option<Vec<ColumnBounds>>,
121-
) -> Self {
122-
Self {
123-
hash_map,
124-
batch,
125-
values,
126-
visited_indices_bitmap,
127-
probe_threads_counter,
128-
_reservation: reservation,
129-
bounds,
130-
}
131-
}
132-
133119
/// return a reference to the hash map
134120
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
135121
&*self.hash_map
@@ -150,6 +136,11 @@ impl JoinLeftData {
150136
&self.visited_indices_bitmap
151137
}
152138

139+
/// returns a reference to the InList values for filter pushdown
140+
pub(super) fn membership(&self) -> &PushdownStrategy {
141+
&self.membership
142+
}
143+
153144
/// Decrements the counter of running threads, and returns `true`
154145
/// if caller is the last running thread
155146
pub(super) fn report_probe_completed(&self) -> bool {
@@ -364,9 +355,9 @@ pub struct HashJoinExec {
364355
struct HashJoinExecDynamicFilter {
365356
/// Dynamic filter that we'll update with the results of the build side once that is done.
366357
filter: Arc<DynamicFilterPhysicalExpr>,
367-
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
358+
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
368359
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
369-
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
360+
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
370361
}
371362

372363
impl fmt::Debug for HashJoinExec {
@@ -945,6 +936,11 @@ impl ExecutionPlan for HashJoinExec {
945936
need_produce_result_in_final(self.join_type),
946937
self.right().output_partitioning().partition_count(),
947938
enable_dynamic_filter_pushdown,
939+
context
940+
.session_config()
941+
.options()
942+
.optimizer
943+
.hash_join_inlist_pushdown_max_size,
948944
))
949945
})?,
950946
PartitionMode::Partitioned => {
@@ -963,6 +959,11 @@ impl ExecutionPlan for HashJoinExec {
963959
need_produce_result_in_final(self.join_type),
964960
1,
965961
enable_dynamic_filter_pushdown,
962+
context
963+
.session_config()
964+
.options()
965+
.optimizer
966+
.hash_join_inlist_pushdown_max_size,
966967
))
967968
}
968969
PartitionMode::Auto => {
@@ -975,8 +976,10 @@ impl ExecutionPlan for HashJoinExec {
975976

976977
let batch_size = context.session_config().batch_size();
977978

978-
// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
979-
let bounds_accumulator = enable_dynamic_filter_pushdown
979+
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
980+
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
981+
let repartition_random_state = RandomState::with_seeds(0, 0, 0, 0);
982+
let build_accumulator = enable_dynamic_filter_pushdown
980983
.then(|| {
981984
self.dynamic_filter.as_ref().map(|df| {
982985
let filter = Arc::clone(&df.filter);
@@ -985,13 +988,14 @@ impl ExecutionPlan for HashJoinExec {
985988
.iter()
986989
.map(|(_, right_expr)| Arc::clone(right_expr))
987990
.collect::<Vec<_>>();
988-
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
989-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
991+
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
992+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
990993
self.mode,
991994
self.left.as_ref(),
992995
self.right.as_ref(),
993996
filter,
994997
on_right,
998+
repartition_random_state,
995999
))
9961000
})))
9971001
})
@@ -1034,7 +1038,7 @@ impl ExecutionPlan for HashJoinExec {
10341038
batch_size,
10351039
vec![],
10361040
self.right.output_ordering().is_some(),
1037-
bounds_accumulator,
1041+
build_accumulator,
10381042
self.mode,
10391043
)))
10401044
}
@@ -1195,7 +1199,7 @@ impl ExecutionPlan for HashJoinExec {
11951199
cache: self.cache.clone(),
11961200
dynamic_filter: Some(HashJoinExecDynamicFilter {
11971201
filter: dynamic_filter,
1198-
bounds_accumulator: OnceLock::new(),
1202+
build_accumulator: OnceLock::new(),
11991203
}),
12001204
});
12011205
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
@@ -1344,7 +1348,7 @@ impl BuildSideState {
13441348
/// When `should_compute_bounds` is true, this function computes the min/max bounds
13451349
/// for each join key column but does NOT update the dynamic filter. Instead, the
13461350
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1347-
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
1351+
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
13481352
/// before updating the filter exactly once.
13491353
///
13501354
/// # Returns
@@ -1360,6 +1364,7 @@ async fn collect_left_input(
13601364
with_visited_indices_bitmap: bool,
13611365
probe_threads_count: usize,
13621366
should_compute_bounds: bool,
1367+
max_inlist_size: usize,
13631368
) -> Result<JoinLeftData> {
13641369
let schema = left_stream.schema();
13651370

@@ -1415,6 +1420,7 @@ async fn collect_left_input(
14151420

14161421
// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
14171422
// `u64` indice variant
1423+
// Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
14181424
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
14191425
let estimated_hashtable_size =
14201426
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
@@ -1450,22 +1456,22 @@ async fn collect_left_input(
14501456
offset += batch.num_rows();
14511457
}
14521458
// Merge all batches into a single batch, so we can directly index into the arrays
1453-
let single_batch = concat_batches(&schema, batches_iter)?;
1459+
let batch = concat_batches(&schema, batches_iter)?;
14541460

14551461
// Reserve additional memory for visited indices bitmap and create shared builder
14561462
let visited_indices_bitmap = if with_visited_indices_bitmap {
1457-
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1463+
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
14581464
reservation.try_grow(bitmap_size)?;
14591465
metrics.build_mem_used.add(bitmap_size);
14601466

1461-
let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1467+
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
14621468
bitmap_buffer.append_n(num_rows, false);
14631469
bitmap_buffer
14641470
} else {
14651471
BooleanBufferBuilder::new(0)
14661472
};
14671473

1468-
let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
1474+
let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;
14691475

14701476
// Compute bounds for dynamic filter if enabled
14711477
let bounds = match bounds_accumulators {
@@ -1479,15 +1485,34 @@ async fn collect_left_input(
14791485
_ => None,
14801486
};
14811487

1482-
let data = JoinLeftData::new(
1483-
hashmap,
1484-
single_batch,
1485-
left_values.clone(),
1486-
Mutex::new(visited_indices_bitmap),
1487-
AtomicUsize::new(probe_threads_count),
1488-
reservation,
1488+
// Convert Box to Arc for sharing with SharedBuildAccumulator
1489+
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
1490+
1491+
let membership = if num_rows == 0 {
1492+
PushdownStrategy::Empty
1493+
} else {
1494+
// If the build side is small enough we can use IN list pushdown.
1495+
// If it's too big we fall back to pushing down a reference to the hash table.
1496+
// See `PushdownStrategy` for more details.
1497+
if let Some(in_list_values) =
1498+
build_struct_inlist_values(&left_values, max_inlist_size)?
1499+
{
1500+
PushdownStrategy::InList(in_list_values)
1501+
} else {
1502+
PushdownStrategy::HashTable(Arc::clone(&hash_map))
1503+
}
1504+
};
1505+
1506+
let data = JoinLeftData {
1507+
hash_map,
1508+
batch,
1509+
values: left_values,
1510+
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
1511+
probe_threads_counter: AtomicUsize::new(probe_threads_count),
1512+
_reservation: reservation,
14891513
bounds,
1490-
);
1514+
membership,
1515+
};
14911516

14921517
Ok(data)
14931518
}

0 commit comments

Comments
 (0)