Skip to content

Commit c3648f4

Browse files
committed
Improve coalesce
1 parent f92ff18 commit c3648f4

File tree

1 file changed

+59
-31
lines changed

1 file changed

+59
-31
lines changed

arrow-select/src/coalesce.rs

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
//! [`filter`]: crate::filter::filter
2222
//! [`take`]: crate::take::take
2323
use crate::concat::concat_batches;
24-
use arrow_array::{
25-
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
26-
};
24+
use arrow_array::StringViewArray;
25+
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions};
26+
use arrow_data::ByteView;
2727
use arrow_schema::{ArrowError, SchemaRef};
2828
use std::collections::VecDeque;
2929
use std::sync::Arc;
@@ -164,7 +164,7 @@ impl BatchCoalescer {
164164
return Ok(());
165165
}
166166

167-
let mut batch = gc_string_view_batch(&batch);
167+
let mut batch = gc_string_view_batch(batch);
168168

169169
// If pushing this batch would exceed the target batch size,
170170
// finish the current batch and start a new one
@@ -242,14 +242,14 @@ impl BatchCoalescer {
242242
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
243243
/// `StringViewArray` may only refer to a small portion of the buffer,
244244
/// significantly increasing memory usage.
245-
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
246-
let new_columns: Vec<ArrayRef> = batch
247-
.columns()
248-
.iter()
245+
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
246+
let (schema, columns, num_rows) = batch.into_parts();
247+
let new_columns: Vec<ArrayRef> = columns
248+
.into_iter()
249249
.map(|c| {
250250
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
251251
let Some(s) = c.as_string_view_opt() else {
252-
return Arc::clone(c);
252+
return c;
253253
};
254254
let ideal_buffer_size: usize = s
255255
.views()
@@ -264,41 +264,65 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
264264
})
265265
.sum();
266266
let actual_buffer_size = s.get_buffer_memory_size();
267+
let buffers = s.data_buffers();
267268

268269
// Re-creating the array copies data and can be time consuming.
269270
// We only do it if the array is sparse
270271
if actual_buffer_size > (ideal_buffer_size * 2) {
271272
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
272273
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
273-
let mut builder = StringViewBuilder::with_capacity(s.len());
274-
if ideal_buffer_size > 0 {
275-
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
276-
}
277-
278-
for v in s.iter() {
279-
builder.append_option(v);
280-
}
281-
282-
let gc_string = builder.finish();
283-
284-
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
274+
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
275+
276+
let views: Vec<u128> = s
277+
.views()
278+
.as_ref()
279+
.iter()
280+
.cloned()
281+
.map(|v| {
282+
// SAFETY: ByteView has same memory layout as u128
283+
let mut b: ByteView = ByteView::from(v);
284+
285+
if b.length > 12 {
286+
let offset = buffer.len() as u32;
287+
buffer.extend_from_slice(
288+
buffers[b.buffer_index as usize]
289+
.get(b.offset as usize..b.offset as usize + b.length as usize)
290+
.expect("Invalid buffer slice"),
291+
);
292+
b.offset = offset;
293+
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
294+
}
295+
296+
b.into()
297+
})
298+
.collect();
299+
300+
let buffers = if buffer.is_empty() {
301+
vec![]
302+
} else {
303+
vec![buffer.into()]
304+
};
305+
306+
let gc_string = unsafe {
307+
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
308+
};
285309

286310
Arc::new(gc_string)
287311
} else {
288-
Arc::clone(c)
312+
c
289313
}
290314
})
291315
.collect();
292316
let mut options = RecordBatchOptions::new();
293-
options = options.with_row_count(Some(batch.num_rows()));
294-
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
317+
options = options.with_row_count(Some(num_rows));
318+
RecordBatch::try_new_with_options(schema, new_columns, &options)
295319
.expect("Failed to re-create the gc'ed record batch")
296320
}
297321

298322
#[cfg(test)]
299323
mod tests {
300324
use super::*;
301-
use arrow_array::builder::ArrayBuilder;
325+
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
302326
use arrow_array::{StringViewArray, UInt32Array};
303327
use arrow_schema::{DataType, Field, Schema};
304328
use std::ops::Range;
@@ -518,9 +542,11 @@ mod tests {
518542
fn test_gc_string_view_test_batch_empty() {
519543
let schema = Schema::empty();
520544
let batch = RecordBatch::new_empty(schema.into());
521-
let output_batch = gc_string_view_batch(&batch);
522-
assert_eq!(batch.num_columns(), output_batch.num_columns());
523-
assert_eq!(batch.num_rows(), output_batch.num_rows());
545+
let cols = batch.num_columns();
546+
let num_rows = batch.num_rows();
547+
let output_batch = gc_string_view_batch(batch);
548+
assert_eq!(cols, output_batch.num_columns());
549+
assert_eq!(num_rows, output_batch.num_rows());
524550
}
525551

526552
#[test]
@@ -568,9 +594,11 @@ mod tests {
568594
/// and ensures the number of rows are the same
569595
fn do_gc(array: StringViewArray) -> StringViewArray {
570596
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
571-
let gc_batch = gc_string_view_batch(&batch);
572-
assert_eq!(batch.num_rows(), gc_batch.num_rows());
573-
assert_eq!(batch.schema(), gc_batch.schema());
597+
let rows = batch.num_rows();
598+
let schema = batch.schema();
599+
let gc_batch = gc_string_view_batch(batch);
600+
assert_eq!(rows, gc_batch.num_rows());
601+
assert_eq!(schema, gc_batch.schema());
574602
gc_batch
575603
.column(0)
576604
.as_any()

0 commit comments

Comments
 (0)