Skip to content

Commit

Permalink
Support SortMergeJoin spilling (#11218)
Browse files Browse the repository at this point in the history
* Support SortMerge spilling
  • Loading branch information
comphead authored Jul 22, 2024
1 parent 2587df0 commit 63efaee
Show file tree
Hide file tree
Showing 5 changed files with 529 additions and 84 deletions.
27 changes: 24 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn cross_join() {
}

#[tokio::test]
async fn merge_join() {
async fn sort_merge_join_no_spill() {
// Planner chooses MergeJoin only if number of partitions > 1
let config = SessionConfig::new()
.with_target_partitions(2)
Expand All @@ -175,11 +175,32 @@ async fn merge_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"Failed to allocate additional",
"SMJStream",
"Disk spilling disabled",
])
.with_memory_limit(1_000)
.with_config(config)
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
}

#[tokio::test]
async fn sort_merge_join_spill() {
// Planner chooses MergeJoin only if number of partitions > 1
let config = SessionConfig::new()
.with_target_partitions(2)
.set_bool("datafusion.optimizer.prefer_hash_join", false);

TestCase::new()
.with_query(
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_memory_limit(1_000)
.with_config(config)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
}
Expand Down Expand Up @@ -453,7 +474,7 @@ impl TestCase {
let table = scenario.table();

let rt_config = RuntimeConfig::new()
// do not allow spilling
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

Expand Down
19 changes: 18 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy]` for
//! help with allocation accounting.

use datafusion_common::Result;
use datafusion_common::{internal_err, Result};
use std::{cmp::Ordering, sync::Arc};

mod pool;
Expand Down Expand Up @@ -220,6 +220,23 @@ impl MemoryReservation {
self.size = new_size
}

/// Tries to free `capacity` bytes from this reservation
/// if `capacity` does not exceed [`Self::size`]
/// Returns new reservation size
/// or error if shrinking capacity is more than allocated size
pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
if let Some(new_size) = self.size.checked_sub(capacity) {
self.registration.pool.shrink(self, capacity);
self.size = new_size;
Ok(new_size)
} else {
internal_err!(
"Cannot free the capacity {capacity} out of allocated size {}",
self.size
)
}
}

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Expand Down
Loading

0 comments on commit 63efaee

Please sign in to comment.