Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ pub trait JoinHashMapType {
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;

// Whether values in the hashmap are unique
fn is_unique(&self) -> bool;

/// Updates hashmap from iterator of row indices & row hashes pairs.
fn update_from_iter<'a>(
&mut self,
Expand Down Expand Up @@ -261,13 +264,26 @@ pub trait JoinHashMapType {
limit: usize,
offset: JoinHashMapOffset,
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
let mut input_indices = vec![];
let mut match_indices = vec![];

let mut remaining_output = limit;
let mut input_indices = Vec::with_capacity(limit);
let mut match_indices = Vec::with_capacity(limit);

let hash_map: &HashTable<(u64, u64)> = self.get_map();
let next_chain = self.get_list();
// Check if hashmap consists of unique values
// If so, we can skip the chain traversal
if self.is_unique() {
for (row_idx, &hash_value) in hash_values.iter().enumerate() {
if let Some((_, index)) =
hash_map.find(hash_value, |(hash, _)| hash_value == *hash)
{
input_indices.push(row_idx as u32);
match_indices.push(index - 1);
}
}
return (input_indices, match_indices, None);
}

let mut remaining_output = limit;

// Calculate initial `hash_values` index before iterating
let to_skip = match offset {
Expand Down Expand Up @@ -295,6 +311,7 @@ pub trait JoinHashMapType {
};

let mut row_idx = to_skip;

for hash_value in &hash_values[to_skip..] {
if let Some((_, index)) =
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
Expand Down Expand Up @@ -338,6 +355,11 @@ impl JoinHashMapType for JoinHashMap {
fn get_list(&self) -> &Self::NextType {
&self.next
}

// /// Check if the values in the hashmap are unique.
fn is_unique(&self) -> bool {
self.map.len() == self.next.len()
}
}

impl Debug for JoinHashMap {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ impl JoinHashMapType for PruningJoinHashMap {
fn get_list(&self) -> &Self::NextType {
&self.next
}

fn is_unique(&self) -> bool {
// TODO
false
}
}

/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
Expand Down
Loading