Skip to content

Commit b3623b4

Browse files
committed
refactor(hash_join): Move JoinHashMap to separate mod
1 parent a4a1b08 commit b3623b4

File tree

4 files changed

+332
-325
lines changed

4 files changed

+332
-325
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ use crate::{
4444
common::can_project,
4545
handle_state,
4646
hash_utils::create_hashes,
47+
joins::join_hash_map::JoinHashMapOffset,
4748
joins::utils::{
4849
adjust_indices_by_join_type, apply_join_filter_to_indices,
4950
build_batch_from_indices, build_join_schema, check_join_is_valid,
5051
estimate_join_statistics, need_produce_result_in_final,
5152
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
52-
JoinFilter, JoinHashMap, JoinHashMapOffset, JoinHashMapType, JoinOn, JoinOnRef,
53+
JoinFilter, JoinHashMap, JoinHashMapType, JoinOn, JoinOnRef,
5354
StatefulStreamResult,
5455
},
5556
metrics::{ExecutionPlanMetricsSet, MetricsSet},
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
use std::fmt::{self, Debug};
2+
use std::ops::IndexMut;
3+
4+
use hashbrown::hash_table::Entry::{Occupied, Vacant};
5+
use hashbrown::HashTable;
6+
7+
/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
8+
///
9+
/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
10+
/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
11+
///
12+
/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
13+
/// As the key is a hash value, we need to check possible hash collisions in the probe stage
14+
/// During this stage it might be the case that a row is contained the same hashmap value,
15+
/// but the values don't match. Those are checked in the `equal_rows_arr` method.
16+
///
17+
/// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
18+
///
19+
/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
20+
///
21+
/// The chain can be followed until the value "0" has been reached, meaning the end of the list.
22+
/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
23+
///
24+
/// # Example
25+
///
26+
/// ``` text
27+
/// See the example below:
28+
///
29+
/// Insert (10,1) <-- insert hash value 10 with row index 1
30+
/// map:
31+
/// ----------
32+
/// | 10 | 2 |
33+
/// ----------
34+
/// next:
35+
/// ---------------------
36+
/// | 0 | 0 | 0 | 0 | 0 |
37+
/// ---------------------
38+
/// Insert (20,2)
39+
/// map:
40+
/// ----------
41+
/// | 10 | 2 |
42+
/// | 20 | 3 |
43+
/// ----------
44+
/// next:
45+
/// ---------------------
46+
/// | 0 | 0 | 0 | 0 | 0 |
47+
/// ---------------------
48+
/// Insert (10,3) <-- collision! row index 3 has a hash value of 10 as well
49+
/// map:
50+
/// ----------
51+
/// | 10 | 4 |
52+
/// | 20 | 3 |
53+
/// ----------
54+
/// next:
55+
/// ---------------------
56+
/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 10 maps to 4,2 (which means indices values 3,1)
57+
/// ---------------------
58+
/// Insert (10,4) <-- another collision! row index 4 ALSO has a hash value of 10
59+
/// map:
60+
/// ---------
61+
/// | 10 | 5 |
62+
/// | 20 | 3 |
63+
/// ---------
64+
/// next:
65+
/// ---------------------
66+
/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
67+
/// ---------------------
68+
/// ```
69+
pub struct JoinHashMap {
70+
// Stores hash value to last row index
71+
map: HashTable<(u64, u64)>,
72+
// Stores indices in chained list data structure
73+
next: Vec<u64>,
74+
}
75+
76+
impl JoinHashMap {
77+
#[cfg(test)]
78+
pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
79+
Self { map, next }
80+
}
81+
82+
pub(crate) fn with_capacity(capacity: usize) -> Self {
83+
JoinHashMap {
84+
map: HashTable::with_capacity(capacity),
85+
next: vec![0; capacity],
86+
}
87+
}
88+
}
89+
90+
// Type of offsets for obtaining indices from JoinHashMap.
91+
pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
92+
93+
// Macro for traversing chained values with limit.
94+
// Early returns in case of reaching output tuples limit.
95+
macro_rules! chain_traverse {
96+
(
97+
$input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident,
98+
$input_idx:ident, $chain_idx:ident, $deleted_offset:ident, $remaining_output:ident
99+
) => {
100+
let mut i = $chain_idx - 1;
101+
loop {
102+
let match_row_idx = if let Some(offset) = $deleted_offset {
103+
// This arguments means that we prune the next index way before here.
104+
if i < offset as u64 {
105+
// End of the list due to pruning
106+
break;
107+
}
108+
i - offset as u64
109+
} else {
110+
i
111+
};
112+
$match_indices.push(match_row_idx);
113+
$input_indices.push($input_idx as u32);
114+
$remaining_output -= 1;
115+
// Follow the chain to get the next index value
116+
let next = $next_chain[match_row_idx as usize];
117+
118+
if $remaining_output == 0 {
119+
// In case current input index is the last, and no more chain values left
120+
// returning None as whole input has been scanned
121+
let next_offset = if $input_idx == $hash_values.len() - 1 && next == 0 {
122+
None
123+
} else {
124+
Some(($input_idx, Some(next)))
125+
};
126+
return ($input_indices, $match_indices, next_offset);
127+
}
128+
if next == 0 {
129+
// end of list
130+
break;
131+
}
132+
i = next - 1;
133+
}
134+
};
135+
}
136+
137+
// Trait defining methods that must be implemented by a hash map type to be used for joins.
138+
pub trait JoinHashMapType {
139+
/// The type of list used to store the next list
140+
type NextType: IndexMut<usize, Output = u64>;
141+
/// Extend with zero
142+
fn extend_zero(&mut self, len: usize);
143+
/// Returns mutable references to the hash map and the next.
144+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType);
145+
/// Returns a reference to the hash map.
146+
fn get_map(&self) -> &HashTable<(u64, u64)>;
147+
/// Returns a reference to the next.
148+
fn get_list(&self) -> &Self::NextType;
149+
150+
/// Updates hashmap from iterator of row indices & row hashes pairs.
151+
fn update_from_iter<'a>(
152+
&mut self,
153+
iter: impl Iterator<Item = (usize, &'a u64)>,
154+
deleted_offset: usize,
155+
) {
156+
let (mut_map, mut_list) = self.get_mut();
157+
for (row, &hash_value) in iter {
158+
let entry = mut_map.entry(
159+
hash_value,
160+
|&(hash, _)| hash_value == hash,
161+
|&(hash, _)| hash,
162+
);
163+
164+
match entry {
165+
Occupied(mut occupied_entry) => {
166+
// Already exists: add index to next array
167+
let (_, index) = occupied_entry.get_mut();
168+
let prev_index = *index;
169+
// Store new value inside hashmap
170+
*index = (row + 1) as u64;
171+
// Update chained Vec at `row` with previous value
172+
mut_list[row - deleted_offset] = prev_index;
173+
}
174+
Vacant(vacant_entry) => {
175+
vacant_entry.insert((hash_value, (row + 1) as u64));
176+
// chained list at `row` is already initialized with 0
177+
// meaning end of list
178+
}
179+
}
180+
}
181+
}
182+
183+
/// Returns all pairs of row indices matched by hash.
184+
///
185+
/// This method only compares hashes, so additional further check for actual values
186+
/// equality may be required.
187+
fn get_matched_indices<'a>(
188+
&self,
189+
iter: impl Iterator<Item = (usize, &'a u64)>,
190+
deleted_offset: Option<usize>,
191+
) -> (Vec<u32>, Vec<u64>) {
192+
let mut input_indices = vec![];
193+
let mut match_indices = vec![];
194+
195+
let hash_map = self.get_map();
196+
let next_chain = self.get_list();
197+
for (row_idx, hash_value) in iter {
198+
// Get the hash and find it in the index
199+
if let Some((_, index)) =
200+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
201+
{
202+
let mut i = *index - 1;
203+
loop {
204+
let match_row_idx = if let Some(offset) = deleted_offset {
205+
// This arguments means that we prune the next index way before here.
206+
if i < offset as u64 {
207+
// End of the list due to pruning
208+
break;
209+
}
210+
i - offset as u64
211+
} else {
212+
i
213+
};
214+
match_indices.push(match_row_idx);
215+
input_indices.push(row_idx as u32);
216+
// Follow the chain to get the next index value
217+
let next = next_chain[match_row_idx as usize];
218+
if next == 0 {
219+
// end of list
220+
break;
221+
}
222+
i = next - 1;
223+
}
224+
}
225+
}
226+
227+
(input_indices, match_indices)
228+
}
229+
230+
/// Matches hashes with taking limit and offset into account.
231+
/// Returns pairs of matched indices along with the starting point for next
232+
/// matching iteration (`None` if limit has not been reached).
233+
///
234+
/// This method only compares hashes, so additional further check for actual values
235+
/// equality may be required.
236+
fn get_matched_indices_with_limit_offset(
237+
&self,
238+
hash_values: &[u64],
239+
deleted_offset: Option<usize>,
240+
limit: usize,
241+
offset: JoinHashMapOffset,
242+
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
243+
let mut input_indices = vec![];
244+
let mut match_indices = vec![];
245+
246+
let mut remaining_output = limit;
247+
248+
let hash_map: &HashTable<(u64, u64)> = self.get_map();
249+
let next_chain = self.get_list();
250+
251+
// Calculate initial `hash_values` index before iterating
252+
let to_skip = match offset {
253+
// None `initial_next_idx` indicates that `initial_idx` processing has'n been started
254+
(initial_idx, None) => initial_idx,
255+
// Zero `initial_next_idx` indicates that `initial_idx` has been processed during
256+
// previous iteration, and it should be skipped
257+
(initial_idx, Some(0)) => initial_idx + 1,
258+
// Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
259+
// to start with the next index
260+
(initial_idx, Some(initial_next_idx)) => {
261+
chain_traverse!(
262+
input_indices,
263+
match_indices,
264+
hash_values,
265+
next_chain,
266+
initial_idx,
267+
initial_next_idx,
268+
deleted_offset,
269+
remaining_output
270+
);
271+
272+
initial_idx + 1
273+
}
274+
};
275+
276+
let mut row_idx = to_skip;
277+
for hash_value in &hash_values[to_skip..] {
278+
if let Some((_, index)) =
279+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
280+
{
281+
chain_traverse!(
282+
input_indices,
283+
match_indices,
284+
hash_values,
285+
next_chain,
286+
row_idx,
287+
index,
288+
deleted_offset,
289+
remaining_output
290+
);
291+
}
292+
row_idx += 1;
293+
}
294+
295+
(input_indices, match_indices, None)
296+
}
297+
}
298+
299+
/// Implementation of `JoinHashMapType` for `JoinHashMap`.
300+
impl JoinHashMapType for JoinHashMap {
301+
type NextType = Vec<u64>;
302+
303+
// Void implementation
304+
fn extend_zero(&mut self, _: usize) {}
305+
306+
/// Get mutable references to the hash map and the next.
307+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) {
308+
(&mut self.map, &mut self.next)
309+
}
310+
311+
/// Get a reference to the hash map.
312+
fn get_map(&self) -> &HashTable<(u64, u64)> {
313+
&self.map
314+
}
315+
316+
/// Get a reference to the next.
317+
fn get_list(&self) -> &Self::NextType {
318+
&self.next
319+
}
320+
}
321+
322+
impl Debug for JoinHashMap {
323+
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
324+
Ok(())
325+
}
326+
}

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ mod symmetric_hash_join;
3434
pub mod utils;
3535

3636
mod join_filter;
37+
mod join_hash_map;
38+
3739
#[cfg(test)]
3840
pub mod test_utils;
3941

0 commit comments

Comments
 (0)