Skip to content

Commit 486c5d8

Browse files
adriangbdavidhewittalamb
authored
Refactor InListExpr to support structs by re-using existing hashing infrastructure (apache#18449)
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - (This PR): apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR - Enhance InListExpr to efficiently store homogeneous lists as arrays and avoid a conversion to Vec<PhysicalExpr> by adding an internal InListStorage enum with Array and Exprs variants - Re-use existing hashing and comparison utilities to support Struct arrays and other complex types - Add public function `in_list_from_array(expr, list_array, negated)` for creating InList from arrays Although the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types). --------- Co-authored-by: David Hewitt <mail@davidhewitt.dev> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 6601959 commit 486c5d8

File tree

7 files changed

+1877
-197
lines changed

7 files changed

+1877
-197
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 173 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use crate::cast::{
3131
as_string_array, as_string_view_array, as_struct_array,
3232
};
3333
use crate::error::Result;
34-
#[cfg(not(feature = "force_hash_collisions"))]
35-
use crate::error::_internal_err;
34+
use crate::error::{_internal_datafusion_err, _internal_err};
35+
use std::cell::RefCell;
3636

3737
// Combines two hashes into one hash
3838
#[inline]
@@ -41,6 +41,94 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
4141
hash.wrapping_mul(37).wrapping_add(r)
4242
}
4343

44+
/// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements).
45+
/// The goal of this is to avoid unbounded memory growth that would appear as a memory leak.
46+
/// We allow temporary allocations beyond this size, but after use the buffer is truncated
47+
/// to this size.
48+
const MAX_BUFFER_SIZE: usize = 524_288;
49+
50+
thread_local! {
51+
/// Thread-local buffer for hash computations to avoid repeated allocations.
52+
/// The buffer is reused across calls and truncated if it exceeds MAX_BUFFER_SIZE.
53+
/// Defaults to a capacity of 8192 u64 elements which is the default batch size.
54+
/// This corresponds to 64KB of memory.
55+
static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
56+
}
57+
58+
/// Creates hashes for the given arrays using a thread-local buffer, then calls the provided callback
59+
/// with an immutable reference to the computed hashes.
60+
///
61+
/// This function manages a thread-local buffer to avoid repeated allocations. The buffer is automatically
62+
/// truncated if it exceeds `MAX_BUFFER_SIZE` after use.
63+
///
64+
/// # Arguments
65+
/// * `arrays` - The arrays to hash (must contain at least one array)
66+
/// * `random_state` - The random state for hashing
67+
/// * `callback` - A function that receives an immutable reference to the hash slice and returns a result
68+
///
69+
/// # Errors
70+
/// Returns an error if:
71+
/// - No arrays are provided
72+
/// - The function is called reentrantly (i.e., the callback invokes `with_hashes` again on the same thread)
73+
/// - The function is called during or after thread destruction
74+
///
75+
/// # Example
76+
/// ```ignore
77+
/// use datafusion_common::hash_utils::{with_hashes, RandomState};
78+
/// use arrow::array::{Int32Array, ArrayRef};
79+
/// use std::sync::Arc;
80+
///
81+
/// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
82+
/// let random_state = RandomState::new();
83+
///
84+
/// let result = with_hashes([&array], &random_state, |hashes| {
85+
/// // Use the hashes here
86+
/// Ok(hashes.len())
87+
/// })?;
88+
/// ```
89+
pub fn with_hashes<I, T, F, R>(
90+
arrays: I,
91+
random_state: &RandomState,
92+
callback: F,
93+
) -> Result<R>
94+
where
95+
I: IntoIterator<Item = T>,
96+
T: AsDynArray,
97+
F: FnOnce(&[u64]) -> Result<R>,
98+
{
99+
// Peek at the first array to determine buffer size without fully collecting
100+
let mut iter = arrays.into_iter().peekable();
101+
102+
// Get the required size from the first array
103+
let required_size = match iter.peek() {
104+
Some(arr) => arr.as_dyn_array().len(),
105+
None => return _internal_err!("with_hashes requires at least one array"),
106+
};
107+
108+
HASH_BUFFER.try_with(|cell| {
109+
let mut buffer = cell.try_borrow_mut()
110+
.map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?;
111+
112+
// Ensure buffer has sufficient length, clearing old values
113+
buffer.clear();
114+
buffer.resize(required_size, 0);
115+
116+
// Create hashes in the buffer - this consumes the iterator
117+
create_hashes(iter, random_state, &mut buffer[..required_size])?;
118+
119+
// Execute the callback with an immutable slice
120+
let result = callback(&buffer[..required_size])?;
121+
122+
// Cleanup: truncate if buffer grew too large
123+
if buffer.capacity() > MAX_BUFFER_SIZE {
124+
buffer.truncate(MAX_BUFFER_SIZE);
125+
buffer.shrink_to_fit();
126+
}
127+
128+
Ok(result)
129+
}).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))?
130+
}
131+
44132
#[cfg(not(feature = "force_hash_collisions"))]
45133
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
46134
if mul_col {
@@ -478,8 +566,8 @@ impl AsDynArray for &ArrayRef {
478566
pub fn create_hashes<'a, I, T>(
479567
arrays: I,
480568
random_state: &RandomState,
481-
hashes_buffer: &'a mut Vec<u64>,
482-
) -> Result<&'a mut Vec<u64>>
569+
hashes_buffer: &'a mut [u64],
570+
) -> Result<&'a mut [u64]>
483571
where
484572
I: IntoIterator<Item = T>,
485573
T: AsDynArray,
@@ -522,7 +610,7 @@ mod tests {
522610
fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
523611
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
524612
let random_state = RandomState::with_seeds(0, 0, 0, 0);
525-
let hashes_buff = &mut vec![0; 0];
613+
let hashes_buff = &mut [0; 0];
526614
let hashes = create_hashes(
527615
&[Arc::new(empty_array) as ArrayRef],
528616
&random_state,
@@ -1000,4 +1088,84 @@ mod tests {
10001088

10011089
assert_eq!(hashes1, hashes2);
10021090
}
1091+
1092+
#[test]
1093+
fn test_with_hashes() {
1094+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1095+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1096+
1097+
// Test that with_hashes produces the same results as create_hashes
1098+
let mut expected_hashes = vec![0; array.len()];
1099+
create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
1100+
1101+
let result = with_hashes([&array], &random_state, |hashes| {
1102+
assert_eq!(hashes.len(), 4);
1103+
// Verify hashes match expected values
1104+
assert_eq!(hashes, &expected_hashes[..]);
1105+
// Return a copy of the hashes
1106+
Ok(hashes.to_vec())
1107+
})
1108+
.unwrap();
1109+
1110+
// Verify callback result is returned correctly
1111+
assert_eq!(result, expected_hashes);
1112+
}
1113+
1114+
#[test]
1115+
fn test_with_hashes_multi_column() {
1116+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1117+
let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1118+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1119+
1120+
// Test multi-column hashing
1121+
let mut expected_hashes = vec![0; int_array.len()];
1122+
create_hashes(
1123+
[&int_array, &str_array],
1124+
&random_state,
1125+
&mut expected_hashes,
1126+
)
1127+
.unwrap();
1128+
1129+
with_hashes([&int_array, &str_array], &random_state, |hashes| {
1130+
assert_eq!(hashes.len(), 3);
1131+
assert_eq!(hashes, &expected_hashes[..]);
1132+
Ok(())
1133+
})
1134+
.unwrap();
1135+
}
1136+
1137+
#[test]
1138+
fn test_with_hashes_empty_arrays() {
1139+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1140+
1141+
// Test that passing no arrays returns an error
1142+
let empty: [&ArrayRef; 0] = [];
1143+
let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
1144+
1145+
assert!(result.is_err());
1146+
assert!(result
1147+
.unwrap_err()
1148+
.to_string()
1149+
.contains("requires at least one array"));
1150+
}
1151+
1152+
#[test]
1153+
fn test_with_hashes_reentrancy() {
1154+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1155+
let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
1156+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1157+
1158+
// Test that reentrant calls return an error instead of panicking
1159+
let result = with_hashes([&array], &random_state, |_hashes| {
1160+
// Try to call with_hashes again inside the callback
1161+
with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
1162+
});
1163+
1164+
assert!(result.is_err());
1165+
let err_msg = result.unwrap_err().to_string();
1166+
assert!(
1167+
err_msg.contains("reentrantly") || err_msg.contains("cannot be called"),
1168+
"Error message should mention reentrancy: {err_msg}",
1169+
);
1170+
}
10031171
}

0 commit comments

Comments
 (0)