Skip to content

Commit df5be43

Browse files
committed
[fix] removed parallelization
1 parent f764e0a commit df5be43

File tree

1 file changed

+4
-18
lines changed

1 file changed

+4
-18
lines changed

datafusion/src/physical_plan/planner.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,6 @@ impl DefaultPhysicalPlanner {
309309
async move {
310310
let batch_size = ctx_state.config.batch_size;
311311

312-
// TODO make this configurable
313-
let parallelism = 4;
314-
315312
let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
316313
LogicalPlan::TableScan {
317314
source,
@@ -599,13 +596,8 @@ impl DefaultPhysicalPlanner {
599596
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) )
600597
}
601598
LogicalPlan::Union { inputs, .. } => {
602-
let physical_plan_futures = inputs
603-
.iter()
604-
.map(|input| self.create_initial_plan(input, ctx_state))
605-
.collect::<Vec<_>>();
606-
607-
let physical_plans = futures::stream::iter(physical_plan_futures)
608-
.buffered(parallelism)
599+
let physical_plans = futures::stream::iter(inputs)
600+
.then(|lp| self.create_initial_plan(lp, ctx_state))
609601
.try_collect::<Vec<_>>()
610602
.await?;
611603
Ok(Arc::new(UnionExec::new(physical_plans)) )
@@ -782,14 +774,8 @@ impl DefaultPhysicalPlanner {
782774
Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
783775
}
784776
LogicalPlan::Extension { node } => {
785-
let physical_inputs_futures = node
786-
.inputs()
787-
.into_iter()
788-
.map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
789-
.collect::<Vec<_>>();
790-
791-
let physical_inputs = futures::stream::iter(physical_inputs_futures)
792-
.buffered(parallelism)
777+
let physical_inputs = futures::stream::iter(node.inputs())
778+
.then(|lp| self.create_initial_plan(lp, ctx_state))
793779
.try_collect::<Vec<_>>()
794780
.await?;
795781

0 commit comments

Comments
 (0)