From 2c83b0225af826d6ee5f35e2b97edfc577868ee6 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Fri, 22 Sep 2023 07:56:34 -0400 Subject: [PATCH] Support hashing List columns (#7616) * Hash ListArray * Implement hash join for list arrays * add sqllogic test for grouping by list column * reset parquet-testing * reset testing * clippy --- datafusion/expr/src/utils.rs | 2 + datafusion/physical-expr/src/hash_utils.rs | 63 +++++++++++++++++++ .../sqllogictest/test_files/groupby.slt | 7 +++ 3 files changed, 72 insertions(+) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index e94d5f4b3fc5..54a1ce348bf9 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -917,6 +917,8 @@ pub fn can_hash(data_type: &DataType) -> bool { { DataType::is_dictionary_key_type(key_type) } + DataType::List(_) => true, + DataType::LargeList(_) => true, _ => false, } } diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs index 227caaf4aab4..379e0eba5277 100644 --- a/datafusion/physical-expr/src/hash_utils.rs +++ b/datafusion/physical-expr/src/hash_utils.rs @@ -207,6 +207,39 @@ fn hash_dictionary( Ok(()) } +fn hash_list_array( + array: &GenericListArray, + random_state: &RandomState, + hashes_buffer: &mut [u64], +) -> Result<()> +where + OffsetSize: OffsetSizeTrait, +{ + let values = array.values().clone(); + let offsets = array.value_offsets(); + let nulls = array.nulls(); + let mut values_hashes = vec![0u64; values.len()]; + create_hashes(&[values], random_state, &mut values_hashes)?; + if let Some(nulls) = nulls { + for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { + if nulls.is_valid(i) { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + *hash = combine_hashes(*hash, *values_hash); + } + } + } + } else { + for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() { + let hash = &mut hashes_buffer[i]; + for values_hash in &values_hashes[start.as_usize()..stop.as_usize()] { + *hash = combine_hashes(*hash, *values_hash); + } + } + } + Ok(()) +} + /// Test version of `create_hashes` that produces the same value for /// all hashes (to test collisions) /// @@ -294,6 +327,14 @@ pub fn create_hashes<'a>( array => hash_dictionary(array, random_state, hashes_buffer, rehash)?, _ => unreachable!() } + DataType::List(_) => { + let array = as_list_array(array); + hash_list_array(array, random_state, hashes_buffer)?; + } + DataType::LargeList(_) => { + let array = as_large_list_array(array); + hash_list_array(array, random_state, hashes_buffer)?; + } _ => { // This is internal because we should have caught this before. return internal_err!( @@ -452,6 +493,28 @@ mod tests { assert_ne!(dict_hashes[0], dict_hashes[2]); } + #[test] + // Tests actual values of hashes, which are different if forcing collisions + #[cfg(not(feature = "force_hash_collisions"))] + fn create_hashes_for_list_arrays() { + let data = vec![ + Some(vec![Some(0), Some(1), Some(2)]), + None, + Some(vec![Some(3), None, Some(5)]), + Some(vec![Some(3), None, Some(5)]), + None, + Some(vec![Some(0), Some(1), Some(2)]), + ]; + let list_array = + Arc::new(ListArray::from_iter_primitive::(data)) as ArrayRef; + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let mut hashes = vec![0; list_array.len()]; + create_hashes(&[list_array], &random_state, &mut hashes).unwrap(); + assert_eq!(hashes[0], hashes[5]); + assert_eq!(hashes[1], hashes[4]); + assert_eq!(hashes[2], hashes[3]); + } + #[test] // Tests actual values of hashes, which are different if forcing collisions #[cfg(not(feature = "force_hash_collisions"))] diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index c93617f352ad..7092c5933f5f 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3250,6 +3250,13 @@ ORDER BY l.sn ---- 0 30 30 +# Should support grouping by list column +query ?I +SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 'e'], 2), (['a', 'b'], 3)) as values0 GROUP BY column1 ORDER BY column2; +---- +[c, d, e] 1 +[a, b] 2 + # primary key should be aware from which columns it is associated statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\)