Skip to content

Reduce rehashing cost for primitive grouping by also reusing hash value #15962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ hash_float!(f16, f32, f64);
pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
/// The data type of the output array
data_type: DataType,
/// Stores the group index based on the hash of its value
/// Stores the `(group_index, hash)` based on the hash of its value
///
/// We don't store the hashes as hashing fixed width primitives
/// is fast enough for this not to benefit performance
map: HashTable<usize>,
/// We also store `hash` is for reducing cost of rehashing. Such cost
/// is obvious in high cardinality group by situation.
/// More details can see:
/// <https://github.com/apache/datafusion/issues/15961>
///
map: HashTable<(usize, u64)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like the comment right above this line is no longer accurate and needs to be updated as this PR exactly saves the hashes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

/// The group index of the null value if any
null_group: Option<usize>,
/// The values for each group index
Expand Down Expand Up @@ -127,15 +130,15 @@ where
let hash = key.hash(state);
let insert = self.map.entry(
hash,
|g| unsafe { self.values.get_unchecked(*g).is_eq(key) },
|g| unsafe { self.values.get_unchecked(*g).hash(state) },
|&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) },
|&(_, h)| h,
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.values.len();
v.insert(g);
v.insert((g, hash));
self.values.push(key);
g
}
Expand All @@ -148,7 +151,7 @@ where
}

fn size(&self) -> usize {
self.map.capacity() * size_of::<usize>() + self.values.allocated_size()
self.map.capacity() * size_of::<(usize, u64)>() + self.values.allocated_size()
}

fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -181,12 +184,13 @@ where
build_primitive(std::mem::take(&mut self.values), self.null_group.take())
}
EmitTo::First(n) => {
self.map.retain(|group_idx| {
self.map.retain(|entry| {
// Decrement group index by n
let group_idx = entry.0;
match group_idx.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
*group_idx = sub;
entry.0 = sub;
true
}
// Group index was < n, so remove from table
Expand Down