Skip to content

Optimize hash partitioning for cache friendliness #15981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 86 additions & 29 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::array::{RecordBatch, RecordBatchOptions, UInt32Array};
use arrow::compute;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::transpose;
use datafusion_common::HashMap;
Expand Down Expand Up @@ -192,6 +192,7 @@ enum BatchPartitionerState {
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
hash_buffer: Vec<u64>,
indices_buffer: Vec<u32>,
},
RoundRobin {
num_partitions: usize,
Expand All @@ -216,7 +217,8 @@ impl BatchPartitioner {
num_partitions,
// Use fixed random hash
random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
hash_buffer: vec![],
hash_buffer: Vec::new(),
indices_buffer: Vec::new(),
},
other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"),
};
Expand Down Expand Up @@ -267,6 +269,7 @@ impl BatchPartitioner {
exprs,
num_partitions: partitions,
hash_buffer,
indices_buffer,
} => {
// Tracking time required for distributing indexes across output partitions
let timer = self.timer.timer();
Expand All @@ -281,44 +284,98 @@ impl BatchPartitioner {

create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();
indices_buffer.clear();
indices_buffer.resize(batch.num_rows(), 0);

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize].push(index as u32);
let mut cum_histogram = vec![0; *partitions];

if partitions.is_power_of_two() {
for hash in hash_buffer.iter_mut() {
// modulo bit trick: a % (2^k) == a & (2^k - 1)
*hash &= *partitions as u64 - 1;
cum_histogram[*hash as usize] += 1;
}
} else {
for hash in hash_buffer.iter_mut() {
*hash %= *partitions as u64;
cum_histogram[*hash as usize] += 1;
}
}

// Finished building index-arrays for output partitions
timer.done();
for idx in 1..cum_histogram.len() {
cum_histogram[idx] += cum_histogram[idx - 1];
}

// Borrowing partitioner timer to prevent moving `self` to closure
let partitioner_timer = &self.timer;
let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, indices)| {
let indices: PrimitiveArray<UInt32Type> = indices.into();
(!indices.is_empty()).then_some((partition, indices))
// *basically* counting sort
for idx in (0..batch.num_rows()).rev() {
let partition = hash_buffer[idx] as usize;
cum_histogram[partition] -= 1;
indices_buffer[cum_histogram[partition]] = idx as u32;
}

// The cumulative histogram now stores the start of the index range for each partition:
// The indices for partition $i will be stored at indices[cum_histogram[i]..cum_histogram[i + 1]]
//
// Temporarily taking the indices_buffer and will give it back later (pinky promise).
let indices: UInt32Array = std::mem::take(indices_buffer).into();

// We now slice up indices by partition so we can use the `take` kernel
let mut partition_indices = Vec::with_capacity(*partitions);
for partition in 0..*partitions {
let start = cum_histogram[partition];
let end = if partition == *partitions - 1 {
indices.len()
} else {
cum_histogram[partition + 1]
};

if start != end {
partition_indices
.push((partition, indices.slice(start, end - start)));
}
}

// Vector of (partition, partition_length, partition columns) -- the data needed to construct
// an output batch for a partition.
let mut output_batches = partition_indices
.iter()
.map(|(partition, indices)| {
(*partition, indices.len(), Vec::new())
})
.map(move |(partition, indices)| {
// Tracking time required for repartitioned batches construction
let _timer = partitioner_timer.timer();
.collect::<Vec<_>>();

for column in batch.columns() {
for (index, (_, indices)) in partition_indices.iter().enumerate()
{
output_batches[index]
.2
.push(compute::take(column, indices, None)?);
}
}

// Produce batches based on indices
let columns = take_arrays(batch.columns(), &indices, None)?;
// Release references to the indices buffer so we can put it back into the partitioner's state
drop(partition_indices);

// Put back indices buffer -- This wont fail because we are in control of all references to the underlying buffers
*indices_buffer =
indices.into_parts().1.into_inner().into_vec().unwrap();

// Assume the rest of the function is trivial
timer.done();

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices.len()));
let it = output_batches.into_iter().map(
move |(partition, length, columns)| {
let options =
RecordBatchOptions::new().with_row_count(Some(length));
let batch = RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)
.unwrap();
)?;

Ok((partition, batch))
});
},
);

Box::new(it)
}
Expand Down