Skip to content

Commit

Permalink
Support hashing List columns (apache#7616)
Browse files Browse the repository at this point in the history
* Hash ListArray

* Implement hash join for list arrays

* add sqllogic test for grouping by list column

* reset parquet-testing

* reset testing

* clippy
  • Loading branch information
jonmmease authored and Ted-Jiang committed Oct 7, 2023
1 parent 76ee890 commit 74c1fb6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
2 changes: 2 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
{
DataType::is_dictionary_key_type(key_type)
}
DataType::List(_) => true,
DataType::LargeList(_) => true,
_ => false,
}
}
Expand Down
63 changes: 63 additions & 0 deletions datafusion/physical-expr/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,39 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}

fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
OffsetSize: OffsetSizeTrait,
{
let values = array.values().clone();
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
} else {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] {
*hash = combine_hashes(*hash, *values_hash);
}
}
}
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
Expand Down Expand Up @@ -294,6 +327,14 @@ pub fn create_hashes<'a>(
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::List(_) => {
let array = as_list_array(array);
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array);
hash_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return internal_err!(
Expand Down Expand Up @@ -452,6 +493,28 @@ mod tests {
assert_ne!(dict_hashes[0], dict_hashes[2]);
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_list_arrays() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
assert_eq!(hashes[2], hashes[3]);
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3250,6 +3250,13 @@ ORDER BY l.sn
----
0 30 30

# Should support grouping by list column
query ?I
SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 'e'], 2), (['a', 'b'], 3)) as values0 GROUP BY column1 ORDER BY column2;
----
[c, d, e] 1
[a, b] 2


# primary key should be aware from which columns it is associated
statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\)
Expand Down

0 comments on commit 74c1fb6

Please sign in to comment.