Skip to content

Commit dc7df1a

Browse files
committed
Concatenate inside hash repartition
1 parent 90d8f09 commit dc7df1a

File tree

1 file changed

+23
-25
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+23
-25
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -301,19 +301,17 @@ impl BatchPartitioner {
301301
///
302302
/// The time spent repartitioning, not including time spent in `f` will be recorded
303303
/// to the [`metrics::Time`] provided on construction
304-
pub fn partition<'a, F>(
305-
&'a mut self,
306-
input_batches: &'a [RecordBatch],
307-
mut f: F,
308-
) -> Result<()>
304+
#[deprecated(since = "48.0.0", note = "use partition_iter instead")]
305+
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
309306
where
310307
F: FnMut(usize, RecordBatch) -> Result<()>,
311308
{
312-
self.partition_iter(input_batches)?
313-
.try_for_each(|res| match res {
309+
self.partition_iter(&[batch])?.try_for_each(
310+
|res: std::result::Result<(usize, RecordBatch), DataFusionError>| match res {
314311
Ok((partition, batch)) => f(partition, batch),
315312
Err(e) => Err(e),
316-
})
313+
},
314+
)
317315
}
318316

319317
/// Actual implementation of [`partition`](Self::partition).
@@ -347,7 +345,6 @@ impl BatchPartitioner {
347345
// Initialize a vector of vectors to hold batches for each output partition
348346
let mut batches_for_output_partitions: Vec<Vec<RecordBatch>> =
349347
vec![Vec::new(); *num_partitions];
350-
let mut result_vec: Vec<Result<(usize, RecordBatch)>> = Vec::new();
351348

352349
let schema = input_batches[0].schema();
353350

@@ -404,22 +401,23 @@ impl BatchPartitioner {
404401
}
405402
}
406403

407-
for (output_idx, batches_to_concat) in
408-
batches_for_output_partitions.iter().enumerate()
409-
{
410-
if !batches_to_concat.is_empty() {
411-
// All batches for a given output partition must have the same schema
412-
// as the first non-empty input batch.
413-
let concatenated_batch = arrow::compute::concat_batches(
414-
&schema, // Use schema from the first non-empty input batch
415-
batches_to_concat.iter(),
416-
)?;
417-
if concatenated_batch.num_rows() > 0 {
418-
result_vec.push(Ok((output_idx, concatenated_batch)));
419-
}
420-
}
421-
}
422-
Box::new(result_vec.into_iter())
404+
Box::new(
405+
batches_for_output_partitions
406+
.into_iter()
407+
.enumerate()
408+
.filter_map(move |(output_idx, batches_to_concat)| {
409+
(!batches_to_concat.is_empty()).then(|| {
410+
// All batches for a given output partition must have the same schema
411+
// as the first non-empty input batch.
412+
let concatenated_batch =
413+
arrow::compute::concat_batches(
414+
&schema, // Use schema from the first non-empty input batch
415+
batches_to_concat.iter(),
416+
)?;
417+
Ok((output_idx, concatenated_batch))
418+
})
419+
}),
420+
)
423421
}
424422
};
425423
Ok(it)

0 commit comments

Comments
 (0)