Skip to content

Commit 9a99cce

Browse files
committed
Fix
1 parent 6fb5b59 commit 9a99cce

File tree

3 files changed

+121
-17
lines changed

3 files changed

+121
-17
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 115 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn
5151
use arrow::datatypes::{DataType, SchemaRef};
5252
use arrow::row::{RowConverter, Rows, SortField};
5353
use datafusion_common::{
54-
exec_datafusion_err, internal_datafusion_err, internal_err, Result,
54+
exec_datafusion_err, internal_datafusion_err, internal_err, DataFusionError, Result,
5555
};
5656
use datafusion_execution::disk_manager::RefCountedTempFile;
5757
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -323,13 +323,8 @@ impl ExternalSorter {
323323
}
324324

325325
self.reserve_memory_for_merge()?;
326-
327-
let size = get_reserved_byte_for_record_batch(&input);
328-
if self.reservation.try_grow(size).is_err() {
329-
self.sort_and_spill_in_mem_batches().await?;
330-
// After spilling all in-memory batches, the retry should succeed
331-
self.reservation.try_grow(size)?;
332-
}
326+
self.reserve_memory_for_batch_and_maybe_spill(&input)
327+
.await?;
333328

334329
self.in_mem_batches.push(input);
335330
Ok(())
@@ -529,6 +524,12 @@ impl ExternalSorter {
529524
/// Sorts the in-memory batches and merges them into a single sorted run, then writes
530525
/// the result to spill files.
531526
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
527+
if self.in_mem_batches.is_empty() {
528+
return internal_err!(
529+
"in_mem_batches must not be empty when attempting to sort and spill"
530+
);
531+
}
532+
532533
// Release the memory reserved for merge back to the pool so
533534
// there is some left when `in_mem_sort_stream` requests an
534535
// allocation. At the end of this function, memory will be
@@ -678,7 +679,8 @@ impl ExternalSorter {
678679
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
679680
self.in_mem_batches.clear();
680681
self.reservation
681-
.try_resize(get_reserved_byte_for_record_batch(&batch))?;
682+
.try_resize(get_reserved_byte_for_record_batch(&batch))
683+
.map_err(Self::err_with_oom_context)?;
682684
let reservation = self.reservation.take();
683685
return self.sort_batch_stream(batch, metrics, reservation);
684686
}
@@ -759,12 +761,51 @@ impl ExternalSorter {
759761
if self.runtime.disk_manager.tmp_files_enabled() {
760762
let size = self.sort_spill_reservation_bytes;
761763
if self.merge_reservation.size() != size {
762-
self.merge_reservation.try_resize(size)?;
764+
self.merge_reservation
765+
.try_resize(size)
766+
.map_err(Self::err_with_oom_context)?;
763767
}
764768
}
765769

766770
Ok(())
767771
}
772+
773+
/// Reserves memory to be able to accommodate the given batch.
774+
/// If memory is scarce, tries to spill current in-memory batches to disk first.
775+
async fn reserve_memory_for_batch_and_maybe_spill(
776+
&mut self,
777+
input: &RecordBatch,
778+
) -> Result<()> {
779+
let size = get_reserved_byte_for_record_batch(input);
780+
781+
match self.reservation.try_grow(size) {
782+
Ok(_) => Ok(()),
783+
Err(e) => {
784+
if self.in_mem_batches.is_empty() {
785+
return Err(Self::err_with_oom_context(e));
786+
}
787+
788+
// Spill and try again.
789+
self.sort_and_spill_in_mem_batches().await?;
790+
self.reservation
791+
.try_grow(size)
792+
.map_err(Self::err_with_oom_context)
793+
}
794+
}
795+
}
796+
797+
/// Wraps the error with a context message suggesting settings to tweak.
798+
/// This is meant to be used with DataFusionError::ResourcesExhausted only.
799+
fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
800+
match e {
801+
DataFusionError::ResourcesExhausted(_) => e.context(
802+
"Not enough memory to continue external sort. \
803+
Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
804+
),
805+
// This is not an OOM error, so just return it as is.
806+
_ => e,
807+
}
808+
}
768809
}
769810

770811
/// Estimate how much memory is needed to sort a `RecordBatch`.
@@ -1327,7 +1368,7 @@ mod tests {
13271368
use arrow::datatypes::*;
13281369
use datafusion_common::cast::as_primitive_array;
13291370
use datafusion_common::test_util::batches_to_string;
1330-
use datafusion_common::{Result, ScalarValue};
1371+
use datafusion_common::{DataFusionError, Result, ScalarValue};
13311372
use datafusion_execution::config::SessionConfig;
13321373
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
13331374
use datafusion_execution::RecordBatchStream;
@@ -1552,6 +1593,69 @@ mod tests {
15521593
Ok(())
15531594
}
15541595

1596+
#[tokio::test]
1597+
async fn test_batch_reservation_error() -> Result<()> {
1598+
// Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1599+
// These values assume that the ExternalSorter will reserve 800 bytes for the first batch.
1600+
let expected_batch_reservation = 800;
1601+
let merge_reservation: usize = 0; // Set to 0 for simplicity
1602+
let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; // Just short of what we need
1603+
1604+
let session_config =
1605+
SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1606+
let runtime = RuntimeEnvBuilder::new()
1607+
.with_memory_limit(memory_limit, 1.0)
1608+
.build_arc()?;
1609+
let task_ctx = Arc::new(
1610+
TaskContext::default()
1611+
.with_session_config(session_config)
1612+
.with_runtime(runtime),
1613+
);
1614+
1615+
let plan = test::scan_partitioned(1);
1616+
1617+
// Read the first record batch to assert that our memory limit and sort_spill_reservation
1618+
// settings trigger the test scenario.
1619+
{
1620+
let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1621+
let first_batch = stream.next().await.unwrap()?;
1622+
let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1623+
1624+
assert_eq!(batch_reservation, expected_batch_reservation);
1625+
assert!(memory_limit < (merge_reservation + batch_reservation));
1626+
}
1627+
1628+
let sort_exec = Arc::new(SortExec::new(
1629+
LexOrdering::new(vec![PhysicalSortExpr {
1630+
expr: col("i", &plan.schema())?,
1631+
options: SortOptions::default(),
1632+
}]),
1633+
plan,
1634+
));
1635+
1636+
let result = collect(
1637+
Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1638+
Arc::clone(&task_ctx),
1639+
)
1640+
.await;
1641+
1642+
let err = result.unwrap_err();
1643+
assert!(
1644+
matches!(err, DataFusionError::Context(..)),
1645+
"Assertion failed: expected a Context error, but got: {:?}",
1646+
err
1647+
);
1648+
1649+
// Assert that the context error is wrapping a resources exhausted error.
1650+
assert!(
1651+
matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1652+
"Assertion failed: expected a ResourcesExhausted error, but got: {:?}",
1653+
err
1654+
);
1655+
1656+
Ok(())
1657+
}
1658+
15551659
#[tokio::test]
15561660
async fn test_sort_spill_utf8_strings() -> Result<()> {
15571661
let session_config = SessionConfig::new()

docs/source/user-guide/sql/explain.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ to see the high level structure of the plan
7171
| | ┌─────────────┴─────────────┐ |
7272
| | │ RepartitionExec │ |
7373
| | │ -------------------- │ |
74-
| | │ output_partition_count: │ |
74+
| | │ input_partition_count: │ |
7575
| | │ 16 │ |
7676
| | │ │ |
7777
| | │ partitioning_scheme: │ |
@@ -80,7 +80,7 @@ to see the high level structure of the plan
8080
| | ┌─────────────┴─────────────┐ |
8181
| | │ RepartitionExec │ |
8282
| | │ -------------------- │ |
83-
| | │ output_partition_count: │ |
83+
| | │ input_partition_count: │ |
8484
| | │ 1 │ |
8585
| | │ │ |
8686
| | │ partitioning_scheme: │ |

0 commit comments

Comments
 (0)