Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support hashing List columns #7616

Merged
merged 7 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
{
DataType::is_dictionary_key_type(key_type)
}
DataType::List(_) => true,
DataType::LargeList(_) => true,
_ => false,
}
}
Expand Down
63 changes: 63 additions & 0 deletions datafusion/physical-expr/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,39 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}

fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
OffsetSize: OffsetSizeTrait,
{
let values = array.values().clone();
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
Expand Down Expand Up @@ -294,6 +327,14 @@ pub fn create_hashes<'a>(
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::List(_) => {
let array = as_list_array(array);
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array);
hash_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return internal_err!(
Expand Down Expand Up @@ -452,6 +493,28 @@ mod tests {
assert_ne!(dict_hashes[0], dict_hashes[2]);
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_list_arrays() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
assert_eq!(hashes[2], hashes[3]);
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3250,6 +3250,13 @@ ORDER BY l.sn
----
0 30 30

# Should support grouping by list column
query ?I
SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 'e'], 2), (['a', 'b'], 3)) as values0 GROUP BY column1 ORDER BY column2;
----
[c, d, e] 1
[a, b] 2


# primary key should be aware from which columns it is associated
statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\)
Expand Down