Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,8 @@ fn lookup_join_hashmap(
limit: usize,
offset: JoinHashMapOffset,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);
let (probe_indices, build_indices, next_offset) =
build_hashmap.get_matched_indices_with_limit_offset(hashes_buffer, limit, offset);

let build_indices: UInt64Array = build_indices.into();
let probe_indices: UInt32Array = probe_indices.into();
Expand Down Expand Up @@ -3333,7 +3333,7 @@ mod tests {

#[test]
fn join_with_hash_collision() -> Result<()> {
let mut hashmap_left = HashTable::with_capacity(2);
let mut hashmap_left = HashTable::with_capacity(4);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
Expand All @@ -3348,9 +3348,15 @@ mod tests {
hashes_buff,
)?;

// Create hash collisions (same hashes)
// Maps both values to both indices (1 and 2, representing input 0 and 1)
// 0 -> (0, 1)
// 1 -> (0, 2)
// The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);

hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);

let next = vec![2, 0];

Expand Down
57 changes: 37 additions & 20 deletions datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,10 @@ pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
macro_rules! chain_traverse {
(
$input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident,
$input_idx:ident, $chain_idx:ident, $deleted_offset:ident, $remaining_output:ident
$input_idx:ident, $chain_idx:ident, $remaining_output:ident
) => {
let mut i = $chain_idx - 1;
let mut match_row_idx = $chain_idx - 1;
loop {
let match_row_idx = if let Some(offset) = $deleted_offset {
// This arguments means that we prune the next index way before here.
if i < offset as u64 {
// End of the list due to pruning
break;
}
i - offset as u64
} else {
i
};
$match_indices.push(match_row_idx);
$input_indices.push($input_idx as u32);
$remaining_output -= 1;
Expand All @@ -150,7 +140,7 @@ macro_rules! chain_traverse {
// end of list
break;
}
i = next - 1;
match_row_idx = next - 1;
}
};
}
Expand All @@ -168,6 +158,11 @@ pub trait JoinHashMapType {
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;

// Whether values in the hashmap are distinct (no duplicate keys)
fn is_distinct(&self) -> bool {
false
}

/// Updates hashmap from iterator of row indices & row hashes pairs.
fn update_from_iter<'a>(
&mut self,
Expand Down Expand Up @@ -257,17 +252,35 @@ pub trait JoinHashMapType {
fn get_matched_indices_with_limit_offset(
&self,
hash_values: &[u64],
deleted_offset: Option<usize>,
limit: usize,
offset: JoinHashMapOffset,
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let mut remaining_output = limit;
let mut input_indices = Vec::with_capacity(limit);
let mut match_indices = Vec::with_capacity(limit);
Comment thread
Dandandan marked this conversation as resolved.

let hash_map: &HashTable<(u64, u64)> = self.get_map();
let next_chain = self.get_list();
// Check if hashmap consists of unique values
// If so, we can skip the chain traversal
if self.is_distinct() {
Comment thread
Dandandan marked this conversation as resolved.
let start = offset.0;
let end = (start + limit).min(hash_values.len());
for (row_idx, &hash_value) in hash_values[start..end].iter().enumerate() {
if let Some((_, index)) =
hash_map.find(hash_value, |(hash, _)| hash_value == *hash)
{
input_indices.push(start as u32 + row_idx as u32);
match_indices.push(*index - 1);
}
}
if end == hash_values.len() {
// No more values to process
return (input_indices, match_indices, None);
}
return (input_indices, match_indices, Some((end, None)));
}

let mut remaining_output = limit;

// Calculate initial `hash_values` index before iterating
let to_skip = match offset {
Expand All @@ -286,7 +299,6 @@ pub trait JoinHashMapType {
next_chain,
initial_idx,
initial_next_idx,
deleted_offset,
remaining_output
);

Expand All @@ -295,6 +307,7 @@ pub trait JoinHashMapType {
};

let mut row_idx = to_skip;

for hash_value in &hash_values[to_skip..] {
if let Some((_, index)) =
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
Expand All @@ -306,7 +319,6 @@ pub trait JoinHashMapType {
next_chain,
row_idx,
index,
deleted_offset,
remaining_output
);
}
Expand Down Expand Up @@ -338,6 +350,11 @@ impl JoinHashMapType for JoinHashMap {
fn get_list(&self) -> &Self::NextType {
&self.next
}

/// Check if the values in the hashmap are distinct.
fn is_distinct(&self) -> bool {
self.map.len() == self.next.len()
Comment thread
Dandandan marked this conversation as resolved.
}
}

impl Debug for JoinHashMap {
Expand Down