Skip to content

Commit 6ce109b

Browse files
authored
Reduce rehashing cost for primitive grouping by also reusing hash value (#15962)
* also save hash in hashtable in primitive single group by. * address cr.
1 parent 55ba4ca commit 6ce109b

File tree

1 file changed

+15
-11
lines changed
  • datafusion/physical-plan/src/aggregates/group_values/single_group_by

1 file changed

+15
-11
lines changed

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,14 @@ hash_float!(f16, f32, f64);
8181
pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
8282
/// The data type of the output array
8383
data_type: DataType,
84-
/// Stores the group index based on the hash of its value
84+
/// Stores the `(group_index, hash)` based on the hash of its value
8585
///
86-
/// We don't store the hashes as hashing fixed width primitives
87-
/// is fast enough for this not to benefit performance
88-
map: HashTable<usize>,
86+
/// We also store `hash` is for reducing cost of rehashing. Such cost
87+
/// is obvious in high cardinality group by situation.
88+
/// More details can see:
89+
/// <https://github.com/apache/datafusion/issues/15961>
90+
///
91+
map: HashTable<(usize, u64)>,
8992
/// The group index of the null value if any
9093
null_group: Option<usize>,
9194
/// The values for each group index
@@ -127,15 +130,15 @@ where
127130
let hash = key.hash(state);
128131
let insert = self.map.entry(
129132
hash,
130-
|g| unsafe { self.values.get_unchecked(*g).is_eq(key) },
131-
|g| unsafe { self.values.get_unchecked(*g).hash(state) },
133+
|&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) },
134+
|&(_, h)| h,
132135
);
133136

134137
match insert {
135-
hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
138+
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
136139
hashbrown::hash_table::Entry::Vacant(v) => {
137140
let g = self.values.len();
138-
v.insert(g);
141+
v.insert((g, hash));
139142
self.values.push(key);
140143
g
141144
}
@@ -148,7 +151,7 @@ where
148151
}
149152

150153
fn size(&self) -> usize {
151-
self.map.capacity() * size_of::<usize>() + self.values.allocated_size()
154+
self.map.capacity() * size_of::<(usize, u64)>() + self.values.allocated_size()
152155
}
153156

154157
fn is_empty(&self) -> bool {
@@ -181,12 +184,13 @@ where
181184
build_primitive(std::mem::take(&mut self.values), self.null_group.take())
182185
}
183186
EmitTo::First(n) => {
184-
self.map.retain(|group_idx| {
187+
self.map.retain(|entry| {
185188
// Decrement group index by n
189+
let group_idx = entry.0;
186190
match group_idx.checked_sub(n) {
187191
// Group index was >= n, shift value down
188192
Some(sub) => {
189-
*group_idx = sub;
193+
entry.0 = sub;
190194
true
191195
}
192196
// Group index was < n, so remove from table

0 commit comments

Comments
 (0)