Skip to content

Commit cd8f90c

Browse files
committed
refactor(hash_join): Move JoinHashMap to separate mod
1 parent a4a1b08 commit cd8f90c

File tree

4 files changed

+349
-325
lines changed

4 files changed

+349
-325
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ use crate::{
4444
common::can_project,
4545
handle_state,
4646
hash_utils::create_hashes,
47+
joins::join_hash_map::JoinHashMapOffset,
4748
joins::utils::{
4849
adjust_indices_by_join_type, apply_join_filter_to_indices,
4950
build_batch_from_indices, build_join_schema, check_join_is_valid,
5051
estimate_join_statistics, need_produce_result_in_final,
5152
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
52-
JoinFilter, JoinHashMap, JoinHashMapOffset, JoinHashMapType, JoinOn, JoinOnRef,
53+
JoinFilter, JoinHashMap, JoinHashMapType, JoinOn, JoinOnRef,
5354
StatefulStreamResult,
5455
},
5556
metrics::{ExecutionPlanMetricsSet, MetricsSet},
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::{self, Debug};
19+
use std::ops::IndexMut;
20+
21+
use hashbrown::hash_table::Entry::{Occupied, Vacant};
22+
use hashbrown::HashTable;
23+
24+
/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
25+
///
26+
/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
27+
/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
28+
///
29+
/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
30+
/// As the key is a hash value, we need to check possible hash collisions in the probe stage
31+
/// During this stage it might be the case that a row is contained the same hashmap value,
32+
/// but the values don't match. Those are checked in the `equal_rows_arr` method.
33+
///
34+
/// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
35+
///
36+
/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
37+
///
38+
/// The chain can be followed until the value "0" has been reached, meaning the end of the list.
39+
/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
40+
///
41+
/// # Example
42+
///
43+
/// ``` text
44+
/// See the example below:
45+
///
46+
/// Insert (10,1) <-- insert hash value 10 with row index 1
47+
/// map:
48+
/// ----------
49+
/// | 10 | 2 |
50+
/// ----------
51+
/// next:
52+
/// ---------------------
53+
/// | 0 | 0 | 0 | 0 | 0 |
54+
/// ---------------------
55+
/// Insert (20,2)
56+
/// map:
57+
/// ----------
58+
/// | 10 | 2 |
59+
/// | 20 | 3 |
60+
/// ----------
61+
/// next:
62+
/// ---------------------
63+
/// | 0 | 0 | 0 | 0 | 0 |
64+
/// ---------------------
65+
/// Insert (10,3) <-- collision! row index 3 has a hash value of 10 as well
66+
/// map:
67+
/// ----------
68+
/// | 10 | 4 |
69+
/// | 20 | 3 |
70+
/// ----------
71+
/// next:
72+
/// ---------------------
73+
/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 10 maps to 4,2 (which means indices values 3,1)
74+
/// ---------------------
75+
/// Insert (10,4) <-- another collision! row index 4 ALSO has a hash value of 10
76+
/// map:
77+
/// ---------
78+
/// | 10 | 5 |
79+
/// | 20 | 3 |
80+
/// ---------
81+
/// next:
82+
/// ---------------------
83+
/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1)
84+
/// ---------------------
85+
/// ```
86+
pub struct JoinHashMap {
87+
// Stores hash value to last row index
88+
map: HashTable<(u64, u64)>,
89+
// Stores indices in chained list data structure
90+
next: Vec<u64>,
91+
}
92+
93+
impl JoinHashMap {
94+
#[cfg(test)]
95+
pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec<u64>) -> Self {
96+
Self { map, next }
97+
}
98+
99+
pub(crate) fn with_capacity(capacity: usize) -> Self {
100+
JoinHashMap {
101+
map: HashTable::with_capacity(capacity),
102+
next: vec![0; capacity],
103+
}
104+
}
105+
}
106+
107+
// Type of offsets for obtaining indices from JoinHashMap.
108+
pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
109+
110+
// Macro for traversing chained values with limit.
111+
// Early returns in case of reaching output tuples limit.
112+
macro_rules! chain_traverse {
113+
(
114+
$input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident,
115+
$input_idx:ident, $chain_idx:ident, $deleted_offset:ident, $remaining_output:ident
116+
) => {
117+
let mut i = $chain_idx - 1;
118+
loop {
119+
let match_row_idx = if let Some(offset) = $deleted_offset {
120+
// This arguments means that we prune the next index way before here.
121+
if i < offset as u64 {
122+
// End of the list due to pruning
123+
break;
124+
}
125+
i - offset as u64
126+
} else {
127+
i
128+
};
129+
$match_indices.push(match_row_idx);
130+
$input_indices.push($input_idx as u32);
131+
$remaining_output -= 1;
132+
// Follow the chain to get the next index value
133+
let next = $next_chain[match_row_idx as usize];
134+
135+
if $remaining_output == 0 {
136+
// In case current input index is the last, and no more chain values left
137+
// returning None as whole input has been scanned
138+
let next_offset = if $input_idx == $hash_values.len() - 1 && next == 0 {
139+
None
140+
} else {
141+
Some(($input_idx, Some(next)))
142+
};
143+
return ($input_indices, $match_indices, next_offset);
144+
}
145+
if next == 0 {
146+
// end of list
147+
break;
148+
}
149+
i = next - 1;
150+
}
151+
};
152+
}
153+
154+
// Trait defining methods that must be implemented by a hash map type to be used for joins.
155+
pub trait JoinHashMapType {
156+
/// The type of list used to store the next list
157+
type NextType: IndexMut<usize, Output = u64>;
158+
/// Extend with zero
159+
fn extend_zero(&mut self, len: usize);
160+
/// Returns mutable references to the hash map and the next.
161+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType);
162+
/// Returns a reference to the hash map.
163+
fn get_map(&self) -> &HashTable<(u64, u64)>;
164+
/// Returns a reference to the next.
165+
fn get_list(&self) -> &Self::NextType;
166+
167+
/// Updates hashmap from iterator of row indices & row hashes pairs.
168+
fn update_from_iter<'a>(
169+
&mut self,
170+
iter: impl Iterator<Item = (usize, &'a u64)>,
171+
deleted_offset: usize,
172+
) {
173+
let (mut_map, mut_list) = self.get_mut();
174+
for (row, &hash_value) in iter {
175+
let entry = mut_map.entry(
176+
hash_value,
177+
|&(hash, _)| hash_value == hash,
178+
|&(hash, _)| hash,
179+
);
180+
181+
match entry {
182+
Occupied(mut occupied_entry) => {
183+
// Already exists: add index to next array
184+
let (_, index) = occupied_entry.get_mut();
185+
let prev_index = *index;
186+
// Store new value inside hashmap
187+
*index = (row + 1) as u64;
188+
// Update chained Vec at `row` with previous value
189+
mut_list[row - deleted_offset] = prev_index;
190+
}
191+
Vacant(vacant_entry) => {
192+
vacant_entry.insert((hash_value, (row + 1) as u64));
193+
// chained list at `row` is already initialized with 0
194+
// meaning end of list
195+
}
196+
}
197+
}
198+
}
199+
200+
/// Returns all pairs of row indices matched by hash.
201+
///
202+
/// This method only compares hashes, so additional further check for actual values
203+
/// equality may be required.
204+
fn get_matched_indices<'a>(
205+
&self,
206+
iter: impl Iterator<Item = (usize, &'a u64)>,
207+
deleted_offset: Option<usize>,
208+
) -> (Vec<u32>, Vec<u64>) {
209+
let mut input_indices = vec![];
210+
let mut match_indices = vec![];
211+
212+
let hash_map = self.get_map();
213+
let next_chain = self.get_list();
214+
for (row_idx, hash_value) in iter {
215+
// Get the hash and find it in the index
216+
if let Some((_, index)) =
217+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
218+
{
219+
let mut i = *index - 1;
220+
loop {
221+
let match_row_idx = if let Some(offset) = deleted_offset {
222+
// This arguments means that we prune the next index way before here.
223+
if i < offset as u64 {
224+
// End of the list due to pruning
225+
break;
226+
}
227+
i - offset as u64
228+
} else {
229+
i
230+
};
231+
match_indices.push(match_row_idx);
232+
input_indices.push(row_idx as u32);
233+
// Follow the chain to get the next index value
234+
let next = next_chain[match_row_idx as usize];
235+
if next == 0 {
236+
// end of list
237+
break;
238+
}
239+
i = next - 1;
240+
}
241+
}
242+
}
243+
244+
(input_indices, match_indices)
245+
}
246+
247+
/// Matches hashes with taking limit and offset into account.
248+
/// Returns pairs of matched indices along with the starting point for next
249+
/// matching iteration (`None` if limit has not been reached).
250+
///
251+
/// This method only compares hashes, so additional further check for actual values
252+
/// equality may be required.
253+
fn get_matched_indices_with_limit_offset(
254+
&self,
255+
hash_values: &[u64],
256+
deleted_offset: Option<usize>,
257+
limit: usize,
258+
offset: JoinHashMapOffset,
259+
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
260+
let mut input_indices = vec![];
261+
let mut match_indices = vec![];
262+
263+
let mut remaining_output = limit;
264+
265+
let hash_map: &HashTable<(u64, u64)> = self.get_map();
266+
let next_chain = self.get_list();
267+
268+
// Calculate initial `hash_values` index before iterating
269+
let to_skip = match offset {
270+
// None `initial_next_idx` indicates that `initial_idx` processing has'n been started
271+
(initial_idx, None) => initial_idx,
272+
// Zero `initial_next_idx` indicates that `initial_idx` has been processed during
273+
// previous iteration, and it should be skipped
274+
(initial_idx, Some(0)) => initial_idx + 1,
275+
// Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
276+
// to start with the next index
277+
(initial_idx, Some(initial_next_idx)) => {
278+
chain_traverse!(
279+
input_indices,
280+
match_indices,
281+
hash_values,
282+
next_chain,
283+
initial_idx,
284+
initial_next_idx,
285+
deleted_offset,
286+
remaining_output
287+
);
288+
289+
initial_idx + 1
290+
}
291+
};
292+
293+
let mut row_idx = to_skip;
294+
for hash_value in &hash_values[to_skip..] {
295+
if let Some((_, index)) =
296+
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
297+
{
298+
chain_traverse!(
299+
input_indices,
300+
match_indices,
301+
hash_values,
302+
next_chain,
303+
row_idx,
304+
index,
305+
deleted_offset,
306+
remaining_output
307+
);
308+
}
309+
row_idx += 1;
310+
}
311+
312+
(input_indices, match_indices, None)
313+
}
314+
}
315+
316+
/// Implementation of `JoinHashMapType` for `JoinHashMap`.
317+
impl JoinHashMapType for JoinHashMap {
318+
type NextType = Vec<u64>;
319+
320+
// Void implementation
321+
fn extend_zero(&mut self, _: usize) {}
322+
323+
/// Get mutable references to the hash map and the next.
324+
fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) {
325+
(&mut self.map, &mut self.next)
326+
}
327+
328+
/// Get a reference to the hash map.
329+
fn get_map(&self) -> &HashTable<(u64, u64)> {
330+
&self.map
331+
}
332+
333+
/// Get a reference to the next.
334+
fn get_list(&self) -> &Self::NextType {
335+
&self.next
336+
}
337+
}
338+
339+
impl Debug for JoinHashMap {
340+
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
341+
Ok(())
342+
}
343+
}

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ mod symmetric_hash_join;
3434
pub mod utils;
3535

3636
mod join_filter;
37+
mod join_hash_map;
38+
3739
#[cfg(test)]
3840
pub mod test_utils;
3941

0 commit comments

Comments
 (0)