Skip to content

Commit 4737894

Browse files
committed
Refactor create_hashes to accept array references
Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. Changes: - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic
1 parent f57da83 commit 4737894

File tree

5 files changed

+127
-85
lines changed

5 files changed

+127
-85
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 111 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
//! Functionality used both on logical and physical plans
1919
20-
#[cfg(not(feature = "force_hash_collisions"))]
21-
use std::sync::Arc;
22-
2320
use ahash::RandomState;
2421
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
2522
use arrow::array::*;
@@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
215212
// Hash each dictionary value once, and then use that computed
216213
// hash for each key value to avoid a potentially expensive
217214
// redundant hashing for large dictionary elements (e.g. strings)
218-
let dict_values = Arc::clone(array.values());
215+
let dict_values = array.values();
219216
let mut dict_hashes = vec![0; dict_values.len()];
220-
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
217+
create_hashes_from_arrays(&[dict_values.as_ref()], random_state, &mut dict_hashes)?;
221218

222219
// combine hash for each index in values
223-
let dict_values = array.values();
224220
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
225221
if let Some(key) = key {
226222
let idx = key.as_usize();
@@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
308304
where
309305
OffsetSize: OffsetSizeTrait,
310306
{
311-
let values = Arc::clone(array.values());
307+
let values = array.values();
312308
let offsets = array.value_offsets();
313309
let nulls = array.nulls();
314310
let mut values_hashes = vec![0u64; values.len()];
315-
create_hashes(&[values], random_state, &mut values_hashes)?;
311+
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
316312
if let Some(nulls) = nulls {
317313
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
318314
if nulls.is_valid(i) {
@@ -339,11 +335,11 @@ fn hash_fixed_list_array(
339335
random_state: &RandomState,
340336
hashes_buffer: &mut [u64],
341337
) -> Result<()> {
342-
let values = Arc::clone(array.values());
338+
let values = array.values();
343339
let value_length = array.value_length() as usize;
344340
let nulls = array.nulls();
345341
let mut values_hashes = vec![0u64; values.len()];
346-
create_hashes(&[values], random_state, &mut values_hashes)?;
342+
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
347343
if let Some(nulls) = nulls {
348344
for i in 0..array.len() {
349345
if nulls.is_valid(i) {
@@ -366,83 +362,113 @@ fn hash_fixed_list_array(
366362
Ok(())
367363
}
368364

369-
/// Test version of `create_hashes` that produces the same value for
370-
/// all hashes (to test collisions)
371-
///
372-
/// See comments on `hashes_buffer` for more details
365+
/// Internal helper function that hashes a single array and either initializes or combines
366+
/// the hash values in the buffer.
367+
#[cfg(not(feature = "force_hash_collisions"))]
368+
fn hash_single_array(
369+
array: &dyn Array,
370+
random_state: &RandomState,
371+
hashes_buffer: &mut [u64],
372+
rehash: bool,
373+
) -> Result<()> {
374+
downcast_primitive_array! {
375+
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
376+
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
377+
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
378+
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
379+
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
380+
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
381+
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
382+
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
383+
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
384+
DataType::FixedSizeBinary(_) => {
385+
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
386+
hash_array(array, random_state, hashes_buffer, rehash)
387+
}
388+
DataType::Dictionary(_, _) => downcast_dictionary_array! {
389+
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
390+
_ => unreachable!()
391+
}
392+
DataType::Struct(_) => {
393+
let array = as_struct_array(array)?;
394+
hash_struct_array(array, random_state, hashes_buffer)?;
395+
}
396+
DataType::List(_) => {
397+
let array = as_list_array(array)?;
398+
hash_list_array(array, random_state, hashes_buffer)?;
399+
}
400+
DataType::LargeList(_) => {
401+
let array = as_large_list_array(array)?;
402+
hash_list_array(array, random_state, hashes_buffer)?;
403+
}
404+
DataType::Map(_, _) => {
405+
let array = as_map_array(array)?;
406+
hash_map_array(array, random_state, hashes_buffer)?;
407+
}
408+
DataType::FixedSizeList(_,_) => {
409+
let array = as_fixed_size_list_array(array)?;
410+
hash_fixed_list_array(array, random_state, hashes_buffer)?;
411+
}
412+
_ => {
413+
// This is internal because we should have caught this before.
414+
return _internal_err!(
415+
"Unsupported data type in hasher: {}",
416+
array.data_type()
417+
);
418+
}
419+
}
420+
Ok(())
421+
}
422+
423+
/// Test version of `hash_single_array` that forces all hashes to collide to zero.
373424
#[cfg(feature = "force_hash_collisions")]
374-
pub fn create_hashes<'a>(
375-
_arrays: &[ArrayRef],
425+
fn hash_single_array(
426+
_array: &dyn Array,
376427
_random_state: &RandomState,
377-
hashes_buffer: &'a mut Vec<u64>,
378-
) -> Result<&'a mut Vec<u64>> {
428+
hashes_buffer: &mut [u64],
429+
_rehash: bool,
430+
) -> Result<()> {
379431
for hash in hashes_buffer.iter_mut() {
380432
*hash = 0
381433
}
382434
Ok(hashes_buffer)
383435
}
384436

385-
/// Creates hash values for every row, based on the values in the
386-
/// columns.
437+
/// Creates hash values for every row, based on the values in the columns.
387438
///
388439
/// The number of rows to hash is determined by `hashes_buffer.len()`.
389440
/// `hashes_buffer` should be pre-sized appropriately
390-
#[cfg(not(feature = "force_hash_collisions"))]
441+
///
442+
/// This is the same as [`create_hashes`] but accepts `&dyn Array`s instead of requiring
443+
/// `ArrayRef`s.
444+
pub fn create_hashes_from_arrays<'a>(
445+
arrays: &[&dyn Array],
446+
random_state: &RandomState,
447+
hashes_buffer: &'a mut Vec<u64>,
448+
) -> Result<&'a mut Vec<u64>> {
449+
for (i, &array) in arrays.iter().enumerate() {
450+
// combine hashes with `combine_hashes` for all columns besides the first
451+
let rehash = i >= 1;
452+
hash_single_array(array, random_state, hashes_buffer, rehash)?;
453+
}
454+
Ok(hashes_buffer)
455+
}
456+
457+
/// Creates hash values for every row, based on the values in the columns.
458+
///
459+
/// The number of rows to hash is determined by `hashes_buffer.len()`.
460+
/// `hashes_buffer` should be pre-sized appropriately.
461+
///
462+
/// This is the same as [`create_hashes_from_arrays`] but accepts `ArrayRef`s.
391463
pub fn create_hashes<'a>(
392464
arrays: &[ArrayRef],
393465
random_state: &RandomState,
394466
hashes_buffer: &'a mut Vec<u64>,
395467
) -> Result<&'a mut Vec<u64>> {
396-
for (i, col) in arrays.iter().enumerate() {
397-
let array = col.as_ref();
468+
for (i, array) in arrays.iter().enumerate() {
398469
// combine hashes with `combine_hashes` for all columns besides the first
399470
let rehash = i >= 1;
400-
downcast_primitive_array! {
401-
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
402-
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
403-
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
404-
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
405-
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
406-
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
407-
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
408-
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
409-
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
410-
DataType::FixedSizeBinary(_) => {
411-
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
412-
hash_array(array, random_state, hashes_buffer, rehash)
413-
}
414-
DataType::Dictionary(_, _) => downcast_dictionary_array! {
415-
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
416-
_ => unreachable!()
417-
}
418-
DataType::Struct(_) => {
419-
let array = as_struct_array(array)?;
420-
hash_struct_array(array, random_state, hashes_buffer)?;
421-
}
422-
DataType::List(_) => {
423-
let array = as_list_array(array)?;
424-
hash_list_array(array, random_state, hashes_buffer)?;
425-
}
426-
DataType::LargeList(_) => {
427-
let array = as_large_list_array(array)?;
428-
hash_list_array(array, random_state, hashes_buffer)?;
429-
}
430-
DataType::Map(_, _) => {
431-
let array = as_map_array(array)?;
432-
hash_map_array(array, random_state, hashes_buffer)?;
433-
}
434-
DataType::FixedSizeList(_,_) => {
435-
let array = as_fixed_size_list_array(array)?;
436-
hash_fixed_list_array(array, random_state, hashes_buffer)?;
437-
}
438-
_ => {
439-
// This is internal because we should have caught this before.
440-
return _internal_err!(
441-
"Unsupported data type in hasher: {}",
442-
col.data_type()
443-
);
444-
}
445-
}
471+
hash_single_array(array.as_ref(), random_state, hashes_buffer, rehash)?;
446472
}
447473
Ok(hashes_buffer)
448474
}
@@ -896,4 +922,20 @@ mod tests {
896922

897923
assert_ne!(one_col_hashes, two_col_hashes);
898924
}
925+
926+
#[test]
927+
fn test_create_hashes_from_arrays() {
928+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
929+
let float_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
930+
931+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
932+
let hashes_buff = &mut vec![0; int_array.len()];
933+
let hashes = create_hashes_from_arrays(
934+
&[int_array.as_ref(), float_array.as_ref()],
935+
&random_state,
936+
hashes_buff,
937+
)
938+
.unwrap();
939+
assert_eq!(hashes.len(), 4,);
940+
}
899941
}

datafusion/common/src/scalar/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::cast::{
5151
};
5252
use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_err};
5353
use crate::format::DEFAULT_CAST_OPTIONS;
54-
use crate::hash_utils::create_hashes;
54+
use crate::hash_utils::create_hashes_from_arrays;
5555
use crate::utils::SingleRowListArrayBuilder;
5656
use crate::{_internal_datafusion_err, arrow_datafusion_err};
5757
use arrow::array::{
@@ -878,10 +878,10 @@ impl Hash for ScalarValue {
878878

879879
fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
880880
let len = arr.len();
881-
let arrays = vec![arr];
882881
let hashes_buffer = &mut vec![0; len];
883882
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
884-
let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
883+
let hashes =
884+
create_hashes_from_arrays(&[arr.as_ref()], &random_state, hashes_buffer).unwrap();
885885
// Hash back to std::hash::Hasher
886886
hashes.hash(state);
887887
}

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use arrow::array::{
2727
};
2828
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2929
use arrow::datatypes::DataType;
30-
use datafusion_common::hash_utils::create_hashes;
30+
use datafusion_common::hash_utils::create_hashes_from_arrays;
3131
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
3232
use std::any::type_name;
3333
use std::fmt::Debug;
@@ -349,7 +349,7 @@ where
349349
let batch_hashes = &mut self.hashes_buffer;
350350
batch_hashes.clear();
351351
batch_hashes.resize(values.len(), 0);
352-
create_hashes(&[Arc::clone(values)], &self.random_state, batch_hashes)
352+
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
353353
// hash is supported for all types and create_hashes only
354354
// returns errors for unsupported types
355355
.unwrap();

datafusion/physical-expr-common/src/binary_view_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ahash::RandomState;
2323
use arrow::array::cast::AsArray;
2424
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
2525
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
26-
use datafusion_common::hash_utils::create_hashes;
26+
use datafusion_common::hash_utils::create_hashes_from_arrays;
2727
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
2828
use std::fmt::Debug;
2929
use std::sync::Arc;
@@ -243,7 +243,7 @@ where
243243
let batch_hashes = &mut self.hashes_buffer;
244244
batch_hashes.clear();
245245
batch_hashes.resize(values.len(), 0);
246-
create_hashes(&[Arc::clone(values)], &self.random_state, batch_hashes)
246+
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
247247
// hash is supported for all types and create_hashes only
248248
// returns errors for unsupported types
249249
.unwrap();

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,7 +1515,7 @@ mod tests {
15151515
use arrow::buffer::NullBuffer;
15161516
use arrow::datatypes::{DataType, Field};
15171517
use arrow_schema::Schema;
1518-
use datafusion_common::hash_utils::create_hashes;
1518+
use datafusion_common::hash_utils::create_hashes_from_arrays;
15191519
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
15201520
use datafusion_common::{
15211521
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
@@ -3454,8 +3454,8 @@ mod tests {
34543454

34553455
let random_state = RandomState::with_seeds(0, 0, 0, 0);
34563456
let hashes_buff = &mut vec![0; left.num_rows()];
3457-
let hashes = create_hashes(
3458-
&[Arc::clone(&left.columns()[0])],
3457+
let hashes = create_hashes_from_arrays(
3458+
&[left.columns()[0].as_ref()],
34593459
&random_state,
34603460
hashes_buff,
34613461
)?;
@@ -3487,8 +3487,8 @@ mod tests {
34873487
let right_keys_values =
34883488
key_column.evaluate(&right)?.into_array(right.num_rows())?;
34893489
let mut hashes_buffer = vec![0; right.num_rows()];
3490-
create_hashes(
3491-
&[Arc::clone(&right_keys_values)],
3490+
create_hashes_from_arrays(
3491+
&[right_keys_values.as_ref()],
34923492
&random_state,
34933493
&mut hashes_buffer,
34943494
)?;
@@ -3525,8 +3525,8 @@ mod tests {
35253525

35263526
let random_state = RandomState::with_seeds(0, 0, 0, 0);
35273527
let hashes_buff = &mut vec![0; left.num_rows()];
3528-
let hashes = create_hashes(
3529-
&[Arc::clone(&left.columns()[0])],
3528+
let hashes = create_hashes_from_arrays(
3529+
&[left.columns()[0].as_ref()],
35303530
&random_state,
35313531
hashes_buff,
35323532
)?;
@@ -3552,8 +3552,8 @@ mod tests {
35523552
let right_keys_values =
35533553
key_column.evaluate(&right)?.into_array(right.num_rows())?;
35543554
let mut hashes_buffer = vec![0; right.num_rows()];
3555-
create_hashes(
3556-
&[Arc::clone(&right_keys_values)],
3555+
create_hashes_from_arrays(
3556+
&[right_keys_values.as_ref()],
35573557
&random_state,
35583558
&mut hashes_buffer,
35593559
)?;

0 commit comments

Comments
 (0)