Skip to content

Concatenate inside hash repartition #16223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 107 additions & 68 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,97 +301,125 @@ impl BatchPartitioner {
///
/// The time spent repartitioning, not including time spent in `f` will be recorded
/// to the [`metrics::Time`] provided on construction
#[deprecated(since = "48.0.0", note = "use partition_iter instead")]
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
where
F: FnMut(usize, RecordBatch) -> Result<()>,
{
self.partition_iter(batch)?.try_for_each(|res| match res {
Ok((partition, batch)) => f(partition, batch),
Err(e) => Err(e),
})
self.partition_iter(&[batch])?.try_for_each(
|res: std::result::Result<(usize, RecordBatch), DataFusionError>| match res {
Ok((partition, batch)) => f(partition, batch),
Err(e) => Err(e),
},
)
}

/// Actual implementation of [`partition`](Self::partition).
///
/// The reason this was pulled out is that we need to have a variant of `partition` that works w/ sync functions,
/// and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve
/// this (so we don't need to clone the entire implementation).
fn partition_iter(
&mut self,
batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_> {
fn partition_iter<'a>(
&'a mut self,
input_batches: &'a [RecordBatch],
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + 'a> {
let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> =
match &mut self.state {
BatchPartitionerState::RoundRobin {
num_partitions,
next_idx,
} => {
let idx = *next_idx;
let batch = &input_batches[0];
let target_partition_idx = *next_idx;
*next_idx = (*next_idx + 1) % *num_partitions;
Box::new(std::iter::once(Ok((idx, batch))))
Box::new(std::iter::once(Ok((target_partition_idx, batch.clone()))))
}
BatchPartitionerState::Hash {
random_state,
exprs,
num_partitions: partitions,
num_partitions,
hash_buffer,
} => {
// Tracking time required for distributing indexes across output partitions
let timer = self.timer.timer();

let arrays = exprs
.iter()
.map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);

create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize].push(index as u32);
let _timer = self.timer.timer(); // Overall timer for hash partitioning logic

// Initialize a vector of vectors to hold batches for each output partition
let mut batches_for_output_partitions: Vec<Vec<RecordBatch>> =
vec![Vec::new(); *num_partitions];

let schema = input_batches[0].schema();

let mut indices_for_input_batch_per_output_partition: Vec<Vec<u32>> =
vec![Vec::new(); *num_partitions];

for current_input_batch in input_batches {
if current_input_batch.num_rows() == 0 {
continue;
}

let arrays = exprs
.iter()
.map(|expr| {
expr.evaluate(current_input_batch)?
.into_array(current_input_batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

hash_buffer.clear();
hash_buffer.resize(current_input_batch.num_rows(), 0);
create_hashes(&arrays, random_state, hash_buffer)?;

for (row_idx, hash_val) in hash_buffer.iter().enumerate() {
let output_idx =
(*hash_val % *num_partitions as u64) as usize;
indices_for_input_batch_per_output_partition[output_idx]
.push(row_idx as u32);
}

for output_idx in 0..*num_partitions {
let indices_for_current_output = std::mem::take(
&mut indices_for_input_batch_per_output_partition
[output_idx],
);
if !indices_for_current_output.is_empty() {
let indices_array: PrimitiveArray<UInt32Type> =
indices_for_current_output.into();
let columns = take_arrays(
current_input_batch.columns(),
&indices_array,
None,
)?;
let options = RecordBatchOptions::new()
.with_row_count(Some(indices_array.len()));
let taken_batch = RecordBatch::try_new_with_options(
Arc::clone(current_input_batch.schema_ref()),
columns,
&options,
)?;
batches_for_output_partitions[output_idx]
.push(taken_batch);
}
}
}

// Finished building index-arrays for output partitions
timer.done();

// Borrowing partitioner timer to prevent moving `self` to closure
let partitioner_timer = &self.timer;
let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, indices)| {
let indices: PrimitiveArray<UInt32Type> = indices.into();
(!indices.is_empty()).then_some((partition, indices))
})
.map(move |(partition, indices)| {
// Tracking time required for repartitioned batches construction
let _timer = partitioner_timer.timer();

// Produce batches based on indices
let columns = take_arrays(batch.columns(), &indices, None)?;

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices.len()));
let batch = RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)
.unwrap();

Ok((partition, batch))
});

Box::new(it)
Box::new(
batches_for_output_partitions
.into_iter()
.enumerate()
.filter_map(move |(output_idx, batches_to_concat)| {
(!batches_to_concat.is_empty()).then(|| {
// All batches for a given output partition must have the same schema
// as the first non-empty input batch.
let concatenated_batch =
arrow::compute::concat_batches(
&schema, // Use schema from the first non-empty input batch
batches_to_concat.iter(),
)?;
Ok((output_idx, concatenated_batch))
})
}),
)
}
};

Ok(it)
}

Expand Down Expand Up @@ -926,9 +954,11 @@ impl RepartitionExec {
partitioning: Partitioning,
metrics: RepartitionMetrics,
) -> Result<()> {
let is_hash_partitioning = matches!(partitioning, Partitioning::Hash(_, _));
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

let mut batches_buffer = Vec::with_capacity(partitioner.num_partitions());
// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {
Expand All @@ -938,12 +968,20 @@ impl RepartitionExec {
timer.done();

// Input is done
let batch = match result {
Some(result) => result?,
None => break,
match result {
Some(result) => {
batches_buffer.push(result?);
if is_hash_partitioning
&& batches_buffer.len() < partitioner.num_partitions()
{
// Keep buffering batches
continue;
}
}
None if batches_buffer.is_empty() => break,
None => {}
};

for res in partitioner.partition_iter(batch)? {
for res in partitioner.partition_iter(&batches_buffer)? {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();

Expand All @@ -960,6 +998,7 @@ impl RepartitionExec {
}
timer.done();
}
batches_buffer.clear();

// If the input stream is endless, we may spin forever and
// never yield back to tokio. See
Expand Down