Skip to content

Commit

Permalink
refactor: migrate GroupValuesColumn to HashTable
Browse files Browse the repository at this point in the history
For #13433.
  • Loading branch information
crepererum committed Dec 2, 2024
1 parent f517ed2 commit 655bb3d
Showing 1 changed file with 53 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;

use hashbrown::raw::RawTable;
use hashbrown::hash_table::HashTable;

const NON_INLINED_FLAG: u64 = 0x8000000000000000;
const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
Expand Down Expand Up @@ -180,7 +180,7 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
/// And we use [`GroupIndexView`] to represent such `group indices` in table.
///
///
map: RawTable<(u64, GroupIndexView)>,
map: HashTable<(u64, GroupIndexView)>,

/// The size of `map` in bytes
map_size: usize,
Expand Down Expand Up @@ -261,7 +261,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {

/// Create a new instance of GroupValuesColumn if supported for the specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
let map = RawTable::with_capacity(0);
let map = HashTable::with_capacity(0);
Ok(Self {
schema,
map,
Expand Down Expand Up @@ -338,7 +338,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
.get_mut(target_hash, |(exist_hash, group_idx_view)| {
.find_mut(target_hash, |(exist_hash, group_idx_view)| {
// It is ensured to be inlined in `scalarized_intern`
debug_assert!(!group_idx_view.is_non_inlined());

Expand Down Expand Up @@ -506,7 +506,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
.get(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
.find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);

let Some((_, group_index_view)) = entry else {
// 1. Bucket not found case
Expand Down Expand Up @@ -733,7 +733,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {

for &row in &self.vectorized_operation_buffers.remaining_row_indices {
let target_hash = batch_hashes[row];
let entry = map.get_mut(target_hash, |(exist_hash, _)| {
let entry = map.find_mut(target_hash, |(exist_hash, _)| {
// Somewhat surprisingly, this closure can be called even if the
// hash doesn't match, so check the hash first with an integer
// comparison first avoid the more expensive comparison with
Expand Down Expand Up @@ -852,7 +852,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// Return group indices of the hash, also if its `group_index_view` is non-inlined
#[cfg(test)]
fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>, GroupIndexView)> {
let entry = self.map.get(hash, |(exist_hash, _)| hash == *exist_hash);
let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);

match entry {
Some((_, group_index_view)) => {
Expand Down Expand Up @@ -1083,67 +1083,63 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
.collect::<Vec<_>>();
let mut next_new_list_offset = 0;

// SAFETY: self.map outlives iterator and is not modified concurrently
unsafe {
for bucket in self.map.iter() {
// In non-streaming case, we need to check if the `group index view`
// is `inlined` or `non-inlined`
if !STREAMING && bucket.as_ref().1.is_non_inlined() {
// Non-inlined case
// We take `group_index_list` from `old_group_index_lists`

// list_offset is incrementally
self.emit_group_index_list_buffer.clear();
let list_offset = bucket.as_ref().1.value() as usize;
for group_index in self.group_index_lists[list_offset].iter()
{
if let Some(remaining) = group_index.checked_sub(n) {
self.emit_group_index_list_buffer.push(remaining);
}
self.map.retain(|(_exist_hash, group_idx_view)| {
// In non-streaming case, we need to check if the `group index view`
// is `inlined` or `non-inlined`
if !STREAMING && group_idx_view.is_non_inlined() {
// Non-inlined case
// We take `group_index_list` from `old_group_index_lists`

// list_offset is incrementally
self.emit_group_index_list_buffer.clear();
let list_offset = group_idx_view.value() as usize;
for group_index in self.group_index_lists[list_offset].iter() {
if let Some(remaining) = group_index.checked_sub(n) {
self.emit_group_index_list_buffer.push(remaining);
}

// The possible results:
// - `new_group_index_list` is empty, we should erase this bucket
// - only one value in `new_group_index_list`, switch the `view` to `inlined`
// - still multiple values in `new_group_index_list`, build and set the new `unlined view`
if self.emit_group_index_list_buffer.is_empty() {
self.map.erase(bucket);
} else if self.emit_group_index_list_buffer.len() == 1 {
let group_index =
self.emit_group_index_list_buffer.first().unwrap();
bucket.as_mut().1 =
GroupIndexView::new_inlined(*group_index as u64);
} else {
let group_index_list =
&mut self.group_index_lists[next_new_list_offset];
group_index_list.clear();
group_index_list
.extend(self.emit_group_index_list_buffer.iter());
bucket.as_mut().1 = GroupIndexView::new_non_inlined(
next_new_list_offset as u64,
);
next_new_list_offset += 1;
}

continue;
}

// The possible results:
// - `new_group_index_list` is empty, we should erase this bucket
// - only one value in `new_group_index_list`, switch the `view` to `inlined`
// - still multiple values in `new_group_index_list`, build and set the new `unlined view`
if self.emit_group_index_list_buffer.is_empty() {
false
} else if self.emit_group_index_list_buffer.len() == 1 {
let group_index =
self.emit_group_index_list_buffer.first().unwrap();
*group_idx_view =
GroupIndexView::new_inlined(*group_index as u64);
true
} else {
let group_index_list =
&mut self.group_index_lists[next_new_list_offset];
group_index_list.clear();
group_index_list
.extend(self.emit_group_index_list_buffer.iter());
*group_idx_view = GroupIndexView::new_non_inlined(
next_new_list_offset as u64,
);
next_new_list_offset += 1;
true
}
} else {
// In `streaming case`, the `group index view` is ensured to be `inlined`
debug_assert!(!bucket.as_ref().1.is_non_inlined());
debug_assert!(!group_idx_view.is_non_inlined());

// Inlined case, we just decrement group index by n)
let group_index = bucket.as_ref().1.value() as usize;
let group_index = group_idx_view.value() as usize;
match group_index.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
bucket.as_mut().1 =
GroupIndexView::new_inlined(sub as u64)
*group_idx_view = GroupIndexView::new_inlined(sub as u64);
true
}
// Group index was < n, so remove from table
None => self.map.erase(bucket),
None => false,
}
}
}
});

if !STREAMING {
self.group_index_lists.truncate(next_new_list_offset);
Expand Down Expand Up @@ -1234,7 +1230,7 @@ mod tests {
use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::utils::proxy::RawTableAllocExt;
use datafusion_common::utils::proxy::HashTableAllocExt;
use datafusion_expr::EmitTo;

use crate::aggregates::group_values::{
Expand Down

0 comments on commit 655bb3d

Please sign in to comment.