From 5d449cc0bfd986f9e356828feee8d22be9dd38b5 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 15 Mar 2024 16:56:00 +0100 Subject: [PATCH] feat: make ooc sort configurable (#15084) --- .../src/executors/sinks/sort/ooc.rs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/sort/ooc.rs b/crates/polars-pipe/src/executors/sinks/sort/ooc.rs index be96d8802134..64acfa30a5db 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/ooc.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/ooc.rs @@ -135,10 +135,14 @@ pub(super) fn sort_ooc( ooc_start: Instant, ) -> PolarsResult { let now = Instant::now(); + let multithreaded_partition = std::env::var("POLARS_OOC_SORT_PAR_PARTITION").is_ok(); + let spill_size = std::env::var("POLARS_OOC_SORT_SPILL_SIZE") + .map(|v| v.parse::().expect("integer")) + .unwrap_or(1 << 26); let samples = samples.to_physical_repr().into_owned(); let spill_size = std::cmp::min( memtrack.get_available_latest() / (samples.len() * 3), - 1 << 26, + spill_size, ); // we collect as I am not sure that if we write to the same directory the @@ -167,7 +171,8 @@ pub(super) fn sort_ooc( let assigned_parts = det_partitions(sort_col, &samples, descending); // partition the dataframe into proper buckets - let (iter, unique_assigned_parts) = partition_df(df, &assigned_parts)?; + let (iter, unique_assigned_parts) = + partition_df(df, &assigned_parts, multithreaded_partition)?; for (part, df) in unique_assigned_parts.into_no_null_iter().zip(iter) { if let Some(df) = partitions_spiller.push(part as usize, df) { io_thread.dump_partition_local(part, df) @@ -227,8 +232,12 @@ fn det_partitions(s: &Series, partitions: &Series, descending: bool) -> IdxCa { search_sorted(partitions, &s, SearchSortedSide::Any, descending).unwrap() } -fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxCa)> { - let groups = partitions.group_tuples(true, false)?; +fn partition_df( + df: DataFrame, + partitions: &IdxCa, + multithreaded: bool, +) -> PolarsResult<(DfIter, IdxCa)> { + let groups = partitions.group_tuples(multithreaded, false)?; let partitions = unsafe { partitions.clone().into_series().agg_first(&groups) }; let partitions = partitions.idx().unwrap().clone(); @@ -236,7 +245,9 @@ fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxC GroupsProxy::Idx(idx) => { let iter = idx.into_iter().map(move |(_, group)| { // groups are in bounds and sorted - unsafe { df._take_unchecked_slice_sorted(&group, true, IsSorted::Ascending) } + unsafe { + df._take_unchecked_slice_sorted(&group, multithreaded, IsSorted::Ascending) + } }); Box::new(iter) as DfIter },