-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix: External sort failing on StringView
due to shared buffers
#14823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
55bda70
cea8b2b
0e4c164
6c3f4d8
92ca3b0
d5e703a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ use std::fmt; | |
use std::fmt::{Debug, Formatter}; | ||
use std::sync::Arc; | ||
|
||
use crate::common::{spawn_buffered, IPCWriter}; | ||
use crate::common::spawn_buffered; | ||
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; | ||
use crate::expressions::PhysicalSortExpr; | ||
use crate::limit::LimitStream; | ||
|
@@ -44,7 +44,9 @@ use crate::{ | |
Statistics, | ||
}; | ||
|
||
use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array}; | ||
use arrow::array::{ | ||
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array, | ||
}; | ||
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; | ||
use arrow::datatypes::{DataType, SchemaRef}; | ||
use arrow::row::{RowConverter, SortField}; | ||
|
@@ -300,6 +302,7 @@ impl ExternalSorter { | |
if input.num_rows() == 0 { | ||
return Ok(()); | ||
} | ||
|
||
self.reserve_memory_for_merge()?; | ||
|
||
let size = get_reserved_byte_for_record_batch(&input); | ||
|
@@ -397,6 +400,8 @@ impl ExternalSorter { | |
return Ok(0); | ||
} | ||
|
||
self.organize_stringview_arrays()?; | ||
|
||
debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); | ||
|
||
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||
|
@@ -414,6 +419,69 @@ impl ExternalSorter { | |
Ok(used) | ||
} | ||
|
||
/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each | ||
/// `StringViewArray` in sequential order by calling `gc()` on them. | ||
/// | ||
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is | ||
/// available | ||
/// | ||
/// # Rationale | ||
/// After (merge-based) sorting, all batches will be sorted into a single run, | ||
/// but physically this sorted run is chunked into many small batches. For | ||
/// `StringViewArray`s inside each sorted run, their inner buffers are not | ||
/// re-constructed by default, leading to non-sequential payload locations | ||
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might | ||
/// be shared by multiple `RecordBatch`es. | ||
/// When writing each batch to disk, the writer has to write all referenced buffers, | ||
/// because they have to be read back one by one to reduce memory usage. This | ||
/// causes extra disk reads and writes, and potentially execution failure. | ||
/// | ||
/// # Example | ||
/// Before sorting: | ||
/// batch1 -> buffer1 | ||
/// batch2 -> buffer2 | ||
/// | ||
/// sorted_batch1 -> buffer1 | ||
/// -> buffer2 | ||
/// sorted_batch2 -> buffer1 | ||
/// -> buffer2 | ||
/// | ||
/// Then when spilling each batch, the writer has to write all referenced buffers | ||
/// repeatedly. | ||
fn organize_stringview_arrays(&mut self) -> Result<()> { | ||
let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len()); | ||
|
||
for batch in self.in_mem_batches.drain(..) { | ||
let mut new_columns: Vec<Arc<dyn Array>> = | ||
Vec::with_capacity(batch.num_columns()); | ||
|
||
let mut arr_mutated = false; | ||
for array in batch.columns() { | ||
if let Some(string_view_array) = | ||
array.as_any().downcast_ref::<StringViewArray>() | ||
{ | ||
let new_array = string_view_array.gc(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will string_view_array.gc() affect the performance when it call many times? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated, if i make sense right, it seems not too much affection to performance, because we only remain the used buffer data? before gc:
after gc:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, there is some inefficiency here, I filed apache/arrow-rs#7184. Once done on |
||
new_columns.push(Arc::new(new_array)); | ||
arr_mutated = true; | ||
} else { | ||
new_columns.push(Arc::clone(array)); | ||
} | ||
} | ||
|
||
let organized_batch = if arr_mutated { | ||
RecordBatch::try_new(batch.schema(), new_columns)? | ||
} else { | ||
batch | ||
}; | ||
|
||
organized_batches.push(organized_batch); | ||
} | ||
|
||
self.in_mem_batches = organized_batches; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Sorts the in_mem_batches in place | ||
/// | ||
/// Sorting may have freed memory, especially if fetch is `Some`. If | ||
|
@@ -439,54 +507,29 @@ impl ExternalSorter { | |
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. | ||
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly | ||
// write sorted batches to disk when the memory is insufficient. | ||
let mut spill_writer: Option<IPCWriter> = None; | ||
while let Some(batch) = sorted_stream.next().await { | ||
let batch = batch?; | ||
match &mut spill_writer { | ||
None => { | ||
let sorted_size = get_reserved_byte_for_record_batch(&batch); | ||
if self.reservation.try_grow(sorted_size).is_err() { | ||
// Directly write in_mem_batches as well as all the remaining batches in | ||
// sorted_stream to disk. Further batches fetched from `sorted_stream` will | ||
// be handled by the `Some(writer)` matching arm. | ||
let spill_file = | ||
self.runtime.disk_manager.create_tmp_file("Sorting")?; | ||
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?; | ||
// Flush everything in memory to the spill file | ||
for batch in self.in_mem_batches.drain(..) { | ||
writer.write(&batch)?; | ||
} | ||
// as well as the newly sorted batch | ||
writer.write(&batch)?; | ||
spill_writer = Some(writer); | ||
self.reservation.free(); | ||
self.spills.push(spill_file); | ||
} else { | ||
self.in_mem_batches.push(batch); | ||
self.in_mem_batches_sorted = true; | ||
} | ||
} | ||
Some(writer) => { | ||
writer.write(&batch)?; | ||
} | ||
let sorted_size = get_reserved_byte_for_record_batch(&batch); | ||
if self.reservation.try_grow(sorted_size).is_err() { | ||
// Although the reservation is not enough, the batch is | ||
// already in memory, so it's okay to combine it with previously | ||
// sorted batches, and spill together. | ||
self.in_mem_batches.push(batch); | ||
self.spill().await?; // reservation is freed in spill() | ||
} else { | ||
self.in_mem_batches.push(batch); | ||
self.in_mem_batches_sorted = true; | ||
} | ||
} | ||
|
||
// Drop early to free up memory reserved by the sorted stream, otherwise the | ||
// upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. | ||
drop(sorted_stream); | ||
|
||
if let Some(writer) = &mut spill_writer { | ||
writer.finish()?; | ||
self.metrics.spill_count.add(1); | ||
self.metrics.spilled_rows.add(writer.num_rows); | ||
self.metrics.spilled_bytes.add(writer.num_bytes); | ||
} | ||
|
||
// Sorting may free up some memory especially when fetch is `Some`. If we have | ||
// not freed more than 50% of the memory, then we have to spill to free up more | ||
// memory for inserting more batches. | ||
if spill_writer.is_none() && self.reservation.size() > before / 2 { | ||
if self.reservation.size() > before / 2 { | ||
// We have not freed more than 50% of the memory, so we have to spill to | ||
// free up more memory | ||
self.spill().await?; | ||
|
@@ -1422,10 +1465,14 @@ mod tests { | |
let spill_count = metrics.spill_count().unwrap(); | ||
let spilled_rows = metrics.spilled_rows().unwrap(); | ||
let spilled_bytes = metrics.spilled_bytes().unwrap(); | ||
// Processing 840 KB of data using 400 KB of memory requires at least 2 spills | ||
// It will spill roughly 18000 rows and 800 KBytes. | ||
// We leave a little wiggle room for the actual numbers. | ||
assert!((2..=10).contains(&spill_count)); | ||
|
||
// This test case is processing 840KB of data using 400KB of memory. Note | ||
// that buffered batches can't be dropped until all sorted batches are | ||
// generated, so we can only buffer `sort_spill_reservation_bytes` of sorted | ||
// batches. | ||
// The number of spills is roughly calculated as: | ||
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)` | ||
assert!((12..=18).contains(&spill_count)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is caused by the above refactor, the old implementation forget to update the statistics, so we missed several counts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also update the comments? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated in 92ca3b0 |
||
assert!((15000..=20000).contains(&spilled_rows)); | ||
assert!((700000..=900000).contains(&spilled_bytes)); | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.