Skip to content

Commit 3869857

Browse files
authored
cache generation of dictionary keys and null arrays for ScalarValue (#16789)
1 parent eb25e8d commit 3869857

File tree

2 files changed

+221
-10
lines changed

2 files changed

+221
-10
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Array caching utilities for scalar values
19+
20+
use std::iter::repeat_n;
21+
use std::sync::{Arc, LazyLock, Mutex};
22+
23+
use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray};
24+
use arrow::datatypes::{
25+
ArrowDictionaryKeyType, DataType, Int16Type, Int32Type, Int64Type, Int8Type,
26+
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
27+
};
28+
29+
/// Maximum number of rows to cache to be conservative on memory usage
30+
const MAX_CACHE_SIZE: usize = 1024 * 1024;
31+
32+
/// Cache for dictionary key arrays to avoid repeated allocations
33+
/// when the same size is used frequently.
34+
///
35+
/// Similar to PartitionColumnProjector's ZeroBufferGenerators, this cache
36+
/// stores key arrays for different dictionary key types. The cache is
37+
/// limited to 1 entry per type (the last size used) to prevent memory leaks
38+
/// for extremely large array requests.
39+
#[derive(Debug)]
40+
struct KeyArrayCache<K: ArrowDictionaryKeyType> {
41+
cache: Option<(usize, bool, PrimitiveArray<K>)>, // (num_rows, is_null, key_array)
42+
}
43+
44+
impl<K: ArrowDictionaryKeyType> Default for KeyArrayCache<K> {
45+
fn default() -> Self {
46+
Self { cache: None }
47+
}
48+
}
49+
50+
impl<K: ArrowDictionaryKeyType> KeyArrayCache<K> {
51+
/// Get or create a cached key array for the given number of rows and null status
52+
fn get_or_create(&mut self, num_rows: usize, is_null: bool) -> PrimitiveArray<K> {
53+
// Check cache size limit to prevent memory leaks
54+
if num_rows > MAX_CACHE_SIZE {
55+
// For very large arrays, don't cache them - just create and return
56+
return self.create_key_array(num_rows, is_null);
57+
}
58+
59+
match &self.cache {
60+
Some((cached_num_rows, cached_is_null, cached_array))
61+
if *cached_num_rows == num_rows && *cached_is_null == is_null =>
62+
{
63+
// Cache hit: reuse existing array if same size and null status
64+
cached_array.clone()
65+
}
66+
_ => {
67+
// Cache miss: create new array and cache it
68+
let key_array = self.create_key_array(num_rows, is_null);
69+
self.cache = Some((num_rows, is_null, key_array.clone()));
70+
key_array
71+
}
72+
}
73+
}
74+
75+
/// Create a new key array with the specified number of rows and null status
76+
fn create_key_array(&self, num_rows: usize, is_null: bool) -> PrimitiveArray<K> {
77+
let key_array: PrimitiveArray<K> = repeat_n(
78+
if is_null {
79+
None
80+
} else {
81+
Some(K::default_value())
82+
},
83+
num_rows,
84+
)
85+
.collect();
86+
key_array
87+
}
88+
}
89+
90+
/// Cache for null arrays to avoid repeated allocations
91+
/// when the same size is used frequently.
92+
#[derive(Debug, Default)]
93+
struct NullArrayCache {
94+
cache: Option<(usize, ArrayRef)>, // (num_rows, null_array)
95+
}
96+
97+
impl NullArrayCache {
98+
/// Get or create a cached null array for the given number of rows
99+
fn get_or_create(&mut self, num_rows: usize) -> ArrayRef {
100+
// Check cache size limit to prevent memory leaks
101+
if num_rows > MAX_CACHE_SIZE {
102+
// For very large arrays, don't cache them - just create and return
103+
return new_null_array(&DataType::Null, num_rows);
104+
}
105+
106+
match &self.cache {
107+
Some((cached_num_rows, cached_array)) if *cached_num_rows == num_rows => {
108+
// Cache hit: reuse existing array if same size
109+
Arc::clone(cached_array)
110+
}
111+
_ => {
112+
// Cache miss: create new array and cache it
113+
let null_array = new_null_array(&DataType::Null, num_rows);
114+
self.cache = Some((num_rows, Arc::clone(&null_array)));
115+
null_array
116+
}
117+
}
118+
}
119+
}
120+
121+
/// Global cache for dictionary key arrays and null arrays
122+
#[derive(Debug, Default)]
123+
struct ArrayCaches {
124+
cache_i8: KeyArrayCache<Int8Type>,
125+
cache_i16: KeyArrayCache<Int16Type>,
126+
cache_i32: KeyArrayCache<Int32Type>,
127+
cache_i64: KeyArrayCache<Int64Type>,
128+
cache_u8: KeyArrayCache<UInt8Type>,
129+
cache_u16: KeyArrayCache<UInt16Type>,
130+
cache_u32: KeyArrayCache<UInt32Type>,
131+
cache_u64: KeyArrayCache<UInt64Type>,
132+
null_cache: NullArrayCache,
133+
}
134+
135+
static ARRAY_CACHES: LazyLock<Mutex<ArrayCaches>> =
136+
LazyLock::new(|| Mutex::new(ArrayCaches::default()));
137+
138+
/// Get the global cache for arrays
139+
fn get_array_caches() -> &'static Mutex<ArrayCaches> {
140+
&ARRAY_CACHES
141+
}
142+
143+
/// Get or create a cached null array for the given number of rows
144+
pub(crate) fn get_or_create_cached_null_array(num_rows: usize) -> ArrayRef {
145+
let cache = get_array_caches();
146+
let mut caches = cache.lock().unwrap();
147+
caches.null_cache.get_or_create(num_rows)
148+
}
149+
150+
/// Get or create a cached key array for a specific key type
151+
pub(crate) fn get_or_create_cached_key_array<K: ArrowDictionaryKeyType>(
152+
num_rows: usize,
153+
is_null: bool,
154+
) -> PrimitiveArray<K> {
155+
let cache = get_array_caches();
156+
let mut caches = cache.lock().unwrap();
157+
158+
// Use the DATA_TYPE to dispatch to the correct cache, similar to original implementation
159+
match K::DATA_TYPE {
160+
DataType::Int8 => {
161+
let array = caches.cache_i8.get_or_create(num_rows, is_null);
162+
// Convert using ArrayData to avoid unsafe transmute
163+
let array_data = array.to_data();
164+
PrimitiveArray::<K>::from(array_data)
165+
}
166+
DataType::Int16 => {
167+
let array = caches.cache_i16.get_or_create(num_rows, is_null);
168+
let array_data = array.to_data();
169+
PrimitiveArray::<K>::from(array_data)
170+
}
171+
DataType::Int32 => {
172+
let array = caches.cache_i32.get_or_create(num_rows, is_null);
173+
let array_data = array.to_data();
174+
PrimitiveArray::<K>::from(array_data)
175+
}
176+
DataType::Int64 => {
177+
let array = caches.cache_i64.get_or_create(num_rows, is_null);
178+
let array_data = array.to_data();
179+
PrimitiveArray::<K>::from(array_data)
180+
}
181+
DataType::UInt8 => {
182+
let array = caches.cache_u8.get_or_create(num_rows, is_null);
183+
let array_data = array.to_data();
184+
PrimitiveArray::<K>::from(array_data)
185+
}
186+
DataType::UInt16 => {
187+
let array = caches.cache_u16.get_or_create(num_rows, is_null);
188+
let array_data = array.to_data();
189+
PrimitiveArray::<K>::from(array_data)
190+
}
191+
DataType::UInt32 => {
192+
let array = caches.cache_u32.get_or_create(num_rows, is_null);
193+
let array_data = array.to_data();
194+
PrimitiveArray::<K>::from(array_data)
195+
}
196+
DataType::UInt64 => {
197+
let array = caches.cache_u64.get_or_create(num_rows, is_null);
198+
let array_data = array.to_data();
199+
PrimitiveArray::<K>::from(array_data)
200+
}
201+
_ => {
202+
// Fallback for unsupported types - create array directly without caching
203+
let key_array: PrimitiveArray<K> = repeat_n(
204+
if is_null {
205+
None
206+
} else {
207+
Some(K::default_value())
208+
},
209+
num_rows,
210+
)
211+
.collect();
212+
key_array
213+
}
214+
}
215+
}

datafusion/common/src/scalar/mod.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! [`ScalarValue`]: stores single values
1919
20+
mod cache;
2021
mod consts;
2122
mod struct_builder;
2223

@@ -81,6 +82,7 @@ use arrow::datatypes::{
8182
UInt32Type, UInt64Type, UInt8Type, UnionFields, UnionMode, DECIMAL128_MAX_PRECISION,
8283
};
8384
use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions};
85+
use cache::{get_or_create_cached_key_array, get_or_create_cached_null_array};
8486
use chrono::{Duration, NaiveDate};
8587
use half::f16;
8688
pub use struct_builder::ScalarStructBuilder;
@@ -864,15 +866,9 @@ fn dict_from_scalar<K: ArrowDictionaryKeyType>(
864866
let values_array = value.to_array_of_size(1)?;
865867

866868
// Create a key array with `size` elements, each of 0
867-
let key_array: PrimitiveArray<K> = repeat_n(
868-
if value.is_null() {
869-
None
870-
} else {
871-
Some(K::default_value())
872-
},
873-
size,
874-
)
875-
.collect();
869+
// Use cache to avoid repeated allocations for the same size
870+
let key_array: PrimitiveArray<K> =
871+
get_or_create_cached_key_array::<K>(size, value.is_null());
876872

877873
// create a new DictionaryArray
878874
//
@@ -2677,7 +2673,7 @@ impl ScalarValue {
26772673
_ => unreachable!("Invalid dictionary keys type: {:?}", key_type),
26782674
}
26792675
}
2680-
ScalarValue::Null => new_null_array(&DataType::Null, size),
2676+
ScalarValue::Null => get_or_create_cached_null_array(size),
26812677
})
26822678
}
26832679

0 commit comments

Comments
 (0)