-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Describe the bug
I am working with
=== Physical plan ===
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
UnboundableExec: unbounded=true
plan. Using the physical plan with CoalescePartitionsExec
and RepartitionExec
causes strange behavior when providing a stream with only one unique value.
The strange behavior:
-
The issue with the
CoalescePartitionsExec
andRepartitionExec
physical plan is that when a stream with only one unique value is provided, no data is read from theRepartitionExec
until the stream is exhausted. Even callingwake_receivers()
does not wake up theDistributionReceiver
. This behavior is not observed withoutCoalescePartitionsExec
. -
If I use more unique values, there is no blocking problem. This problem occurs if we put 1 unique value.
-
Plans with blocking repartition:
=== Physical plan === CoalescePartitionsExec ProjectionExec: expr=[a2@0 as a5] RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1 UnboundableExec: unbounded=false
=== Physical plan === CoalescePartitionsExec RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1 UnboundableExec: unbounded=false
-
Plan without blocking (
plan.execute(2, task)
), and this can change according to hash value.) :=== Physical plan === RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1 UnboundableExec: unbounded=false
To Reproduce
- Create a file
datafusion/core/tests/repartition_exec_blocks.rs
- Put this code
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{
displayable, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct MyUnboundedExec {
batch_produce: Option<usize>,
schema: Arc<Schema>,
/// Ref-counting helper to check if the plan and the produced stream are still in memory.
refs: Arc<()>,
}
impl MyUnboundedExec {
pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
Self {
batch_produce,
schema: Arc::new(schema),
refs: Default::default(),
}
}
}
impl ExecutionPlan for MyUnboundedExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(UnboundedStream {
batch_produce: self.batch_produce,
count: 0,
schema: Arc::clone(&self.schema),
_refs: Arc::clone(&self.refs),
}))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"UnboundableExec: unbounded={}",
self.batch_produce.is_none(),
)
}
}
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
#[derive(Debug)]
pub struct UnboundedStream {
batch_produce: Option<usize>,
count: usize,
/// Schema mocked by this stream.
schema: SchemaRef,
/// Ref-counting helper to check if the stream are still in memory.
_refs: Arc<()>,
}
impl Stream for UnboundedStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(val) = self.batch_produce {
if val <= self.count {
println!("Stream Finished");
return Poll::Ready(None);
}
}
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![Arc::new(UInt32Array::from_slice([1]))],
)?;
self.count += 1;
std::thread::sleep(std::time::Duration::from_millis(100));
Poll::Ready(Some(Ok(batch)))
}
}
impl RecordBatchStream for UnboundedStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[tokio::test(flavor = "multi_thread")]
async fn unbounded_repartition_sa() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
let task = ctx.task_ctx();
let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]);
let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); // If you put None, it will be a unbounded source.
let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?);
let plan = Arc::new(ProjectionExec::try_new(
vec![(col("a2", &schema)?, "a5".to_string())],
plan.clone(),
)?);
let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
println!(
"=== Physical plan ===\n{}\n",
displayable(plan.as_ref()).indent()
);
let mut stream = plan.execute(0, task)?;
while let Some(result) = stream.next().await {
print_batches(&[result?.clone()])?;
}
Ok(())
}
Expected behavior
//! There are `N` virtual MPSC (multi-producer, single consumer) channels with unbounded capacity. However, if all
//! buffers/channels are non-empty, than a global gate will be closed preventing new data from being written (the
//! sender futures will be [pending](Poll::Pending)) until at least one channel is empty (and not closed).
- Since it does not block the senders, I would expect that the waker on the receiver should wake up after
wake_receivers()
call.
Additional context
Add any other context about the problem here.