Skip to content

Strange Behaviour on RepartitionExec with CoalescePartitionsExec. #5278

@metesynnada

Description

@metesynnada

Describe the bug
I am working with

=== Physical plan ===
CoalescePartitionsExec
  RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
    UnboundableExec: unbounded=true

plan. Using the physical plan with CoalescePartitionsExec and RepartitionExec causes strange behavior when providing a stream with only one unique value.

The strange behavior:

  • The issue with the CoalescePartitionsExec and RepartitionExec physical plan is that when a stream with only one unique value is provided, no data is read from the RepartitionExec until the stream is exhausted. Even calling wake_receivers() does not wake up the DistributionReceiver . This behavior is not observed without CoalescePartitionsExec.

  • If I use more unique values, there is no blocking problem. This problem occurs if we put 1 unique value.

  • Plans with blocking repartition:

    === Physical plan ===
    CoalescePartitionsExec
      ProjectionExec: expr=[a2@0 as a5]
        RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
          UnboundableExec: unbounded=false
    
    === Physical plan ===
    CoalescePartitionsExec
      RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
        UnboundableExec: unbounded=false
    
  • Plan without blocking (plan.execute(2, task)), and this can change according to hash value.) :

    === Physical plan ===
    RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
      UnboundableExec: unbounded=false
    

To Reproduce

  • Create a file datafusion/core/tests/repartition_exec_blocks.rs
  • Put this code
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{
    displayable, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
    SendableRecordBatchStream,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

/// A mock execution plan that simply returns the provided data source characteristic
#[derive(Debug, Clone)]
pub struct MyUnboundedExec {
    batch_produce: Option<usize>,
    schema: Arc<Schema>,
    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
    refs: Arc<()>,
}
impl MyUnboundedExec {
    pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
        Self {
            batch_produce,
            schema: Arc::new(schema),
            refs: Default::default(),
        }
    }
}
impl ExecutionPlan for MyUnboundedExec {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        Arc::clone(&self.schema)
    }

    fn output_partitioning(&self) -> Partitioning {
        Partitioning::UnknownPartitioning(1)
    }

    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
        None
    }

    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(self)
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        Ok(Box::pin(UnboundedStream {
            batch_produce: self.batch_produce,
            count: 0,
            schema: Arc::clone(&self.schema),
            _refs: Arc::clone(&self.refs),
        }))
    }

    fn fmt_as(
        &self,
        t: DisplayFormatType,
        f: &mut std::fmt::Formatter,
    ) -> std::fmt::Result {
        match t {
            DisplayFormatType::Default => {
                write!(
                    f,
                    "UnboundableExec: unbounded={}",
                    self.batch_produce.is_none(),
                )
            }
        }
    }

    fn statistics(&self) -> Statistics {
        Statistics::default()
    }
}

#[derive(Debug)]
pub struct UnboundedStream {
    batch_produce: Option<usize>,
    count: usize,
    /// Schema mocked by this stream.
    schema: SchemaRef,

    /// Ref-counting helper to check if the stream are still in memory.
    _refs: Arc<()>,
}

impl Stream for UnboundedStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if let Some(val) = self.batch_produce {
            if val <= self.count {
                println!("Stream Finished");
                return Poll::Ready(None);
            }
        }
        let batch = RecordBatch::try_new(
            self.schema.clone(),
            vec![Arc::new(UInt32Array::from_slice([1]))],
        )?;
        self.count += 1;
        std::thread::sleep(std::time::Duration::from_millis(100));
        Poll::Ready(Some(Ok(batch)))
    }
}

impl RecordBatchStream for UnboundedStream {
    fn schema(&self) -> SchemaRef {
        Arc::clone(&self.schema)
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn unbounded_repartition_sa() -> Result<()> {
    let config = SessionConfig::new();
    let ctx = SessionContext::with_config(config);
    let task = ctx.task_ctx();
    let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]);
    let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); // If you put None, it will be a unbounded source.
    let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
    let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?);
    let plan = Arc::new(ProjectionExec::try_new(
        vec![(col("a2", &schema)?, "a5".to_string())],
        plan.clone(),
    )?);
    let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
    println!(
        "=== Physical plan ===\n{}\n",
        displayable(plan.as_ref()).indent()
    );
    let mut stream = plan.execute(0, task)?;
    while let Some(result) = stream.next().await {
        print_batches(&[result?.clone()])?;
    }
    Ok(())
}

Expected behavior

//! There are `N` virtual MPSC (multi-producer, single consumer) channels with unbounded capacity. However, if all
//! buffers/channels are non-empty, than a global gate will be closed preventing new data from being written (the
//! sender futures will be [pending](Poll::Pending)) until at least one channel is empty (and not closed).
  • Since it does not block the senders, I would expect that the waker on the receiver should wake up after wake_receivers() call.

Additional context
Add any other context about the problem here.

cc @crepererum @alamb @tustvold

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions