Skip to content

Set TrackConsumersPool as default in datafusion-cli #16081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

use std::collections::HashMap;
use std::env;
use std::num::NonZeroUsize;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
use datafusion::execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::DiskManager;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -118,6 +121,13 @@ struct Args {
)]
mem_pool_type: PoolType,

#[clap(
long,
help = "The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0",
default_value = "3"
)]
top_memory_consumers: usize,

#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
Expand Down Expand Up @@ -169,9 +179,22 @@ async fn main_inner() -> Result<()> {
if let Some(memory_limit) = args.memory_limit {
// set memory pool type
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
PoolType::Fair if args.top_memory_consumers == 0 => {
Arc::new(FairSpillPool::new(memory_limit))
}
PoolType::Fair => Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_limit),
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
)),
PoolType::Greedy if args.top_memory_consumers == 0 => {
Arc::new(GreedyMemoryPool::new(memory_limit))
}
PoolType::Greedy => Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_limit),
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
)),
};
Comment on lines 181 to 196
Copy link
Preview

Copilot AI May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The logic for wrapping Fair and Greedy pools with TrackConsumersPool is duplicated. Consider refactoring this common functionality into a helper function to reduce code duplication.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 I ended up keeping the match statement as is since TrackConsumersPool::new() requires a concrete type implementing MemoryPool, and we can't wrap a dyn MemoryPool directly. 😢


rt_builder = rt_builder.with_memory_pool(pool)
}

Expand Down
36 changes: 36 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,42 @@ fn test_cli_format<'a>(#[case] format: &'a str) {
assert_cmd_snapshot!(cmd);
}

#[rstest]
#[case("no_track", ["--top-memory-consumers", "0"])]
#[case("top2", ["--top-memory-consumers", "2"])]
#[case("top3_default", [])]
#[test]
fn test_cli_top_memory_consumers<'a>(
#[case] snapshot_name: &str,
#[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
) {
let mut settings = make_settings();

settings.set_snapshot_suffix(snapshot_name);

settings.add_filter(
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B",
"Consumer(can spill: bool) consumed XB",
);
settings.add_filter(
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
"Error: Failed to allocate ",
);
settings.add_filter(
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
"Resources exhausted: Failed to allocate",
);

let _bound = settings.bind_to_scope();

let mut cmd = cli();
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
cmd.args(["--memory-limit", "10M", "--command", sql]);
cmd.args(top_memory_consumers);

assert_cmd_snapshot!(cmd);
}

#[tokio::test]
async fn test_cli() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "--top-memory-consumers"
- "0"
---
success: false
exit_code: 1
----- stdout -----
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Failed to allocate

----- stderr -----
24 changes: 24 additions & 0 deletions datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "--top-memory-consumers"
- "2"
---
success: false
exit_code: 1
----- stdout -----
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB,
Consumer(can spill: bool) consumed XB.
Error: Failed to allocate

----- stderr -----
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
---
success: false
exit_code: 1
----- stdout -----
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB,
Consumer(can spill: bool) consumed XB,
Consumer(can spill: bool) consumed XB.
Error: Failed to allocate

----- stderr -----
3 changes: 3 additions & 0 deletions docs/source/user-guide/cli/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ OPTIONS:
--mem-pool-type <MEM_POOL_TYPE>
Specify the memory pool type 'greedy' or 'fair', default to 'greedy'

--top-memory-consumers <TOP_MEMORY_CONSUMERS>
The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0 [default: 3]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description updated


-d, --disk-limit <DISK_LIMIT>
Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')

Expand Down