Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ use crate::optimizer::to_approx_perc::ToApproxPerc;

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
Expand Down Expand Up @@ -1164,7 +1163,8 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Arc::new(CoalesceBatches::new()),
Arc::new(Repartition::new()),
// FIXME: Repartition is temporarily disabled to avoid issues with subqueries
//Arc::new(Repartition::new()),
Arc::new(AddCoalescePartitionsExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
Expand Down
93 changes: 61 additions & 32 deletions datafusion/core/src/physical_plan/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::task::{Context, Poll};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::array::new_null_array;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -151,43 +152,71 @@ impl ExecutionPlan for SubqueryExec {
let context = context.clone();
let size_hint = stream.size_hint();
let schema = self.schema.clone();
let res_stream = stream.then(move |batch| {
let cursor = cursor.clone();
let context = context.clone();
let subqueries = subqueries.clone();
let schema = schema.clone();
async move {
let batch = batch?;
let b = Arc::new(batch.clone());
cursor.set_batch(b)?;
let mut subquery_arrays = vec![Vec::new(); subqueries.len()];
for i in 0..batch.num_rows() {
cursor.set_position(i)?;
for (subquery_i, subquery) in subqueries.iter().enumerate() {
if subquery.output_partitioning().partition_count() != 1 {
return Err(ArrowError::ComputeError(format!("Sub query should have only one partition but got {}", subquery.output_partitioning().partition_count())))
}
let mut stream = subquery.execute(0, context.clone()).await?;
let res = stream.next().await;
if let Some(subquery_batch) = res {
let subquery_batch = subquery_batch?;
if subquery_batch.column(0).len() != 1 {
return Err(ArrowError::ComputeError("Sub query should return exactly one row".to_string()))
let res_stream =
stream.then(move |batch| {
let cursor = cursor.clone();
let context = context.clone();
let subqueries = subqueries.clone();
let schema = schema.clone();
async move {
let batch = batch?;
let b = Arc::new(batch.clone());
cursor.set_batch(b)?;
let mut subquery_arrays = vec![Vec::new(); subqueries.len()];
for i in 0..batch.num_rows() {
cursor.set_position(i)?;
for (subquery_i, subquery) in subqueries.iter().enumerate() {
let null_array = || {
let schema = subquery.schema();
let fields = schema.fields();
if fields.len() != 1 {
return Err(ArrowError::ComputeError(format!(
"Sub query should have only one column but got {}",
fields.len()
)));
}

let data_type = fields.get(0).unwrap().data_type();
Ok(new_null_array(data_type, 1))
};

if subquery.output_partitioning().partition_count() != 1 {
return Err(ArrowError::ComputeError(format!(
"Sub query should have only one partition but got {}",
subquery.output_partitioning().partition_count()
)));
}
let mut stream = subquery.execute(0, context.clone()).await?;
let res = stream.next().await;
if let Some(subquery_batch) = res {
let subquery_batch = subquery_batch?;
match subquery_batch.column(0).len() {
0 => subquery_arrays[subquery_i].push(null_array()?),
1 => subquery_arrays[subquery_i]
.push(subquery_batch.column(0).clone()),
_ => return Err(ArrowError::ComputeError(
"Sub query should return no more than one row"
.to_string(),
)),
};
} else {
subquery_arrays[subquery_i].push(subquery_batch.column(0).clone());
subquery_arrays[subquery_i].push(null_array()?);
}
} else {
return Err(ArrowError::ComputeError("Sub query returned empty result set but exactly one row is expected".to_string()))
}
}
let mut new_columns = batch.columns().to_vec();
for subquery_array in subquery_arrays {
new_columns.push(concat(
subquery_array
.iter()
.map(|a| a.as_ref())
.collect::<Vec<_>>()
.as_slice(),
)?);
}
RecordBatch::try_new(schema.clone(), new_columns)
}
let mut new_columns = batch.columns().to_vec();
for subquery_array in subquery_arrays {
new_columns.push(concat(subquery_array.iter().map(|a| a.as_ref()).collect::<Vec<_>>().as_slice())?);
}
RecordBatch::try_new(schema.clone(), new_columns)
}
});
});
Ok(Box::pin(SubQueryStream {
schema: self.schema.clone(),
stream: Box::pin(res_stream),
Expand Down