Skip to content

Commit 801760e

Browse files
authored
Merge pull request #1 from andygrove/coalesce
Coalesce
2 parents cd63de9 + f4b0c08 commit 801760e

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

native/core/src/execution/planner.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
7070

7171
use crate::execution::shuffle::CompressionCodec;
7272
use crate::execution::spark_plan::SparkPlan;
73+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
7374
use datafusion_comet_proto::{
7475
spark_expression::{
7576
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr,
@@ -1183,17 +1184,35 @@ impl PhysicalPlanner {
11831184
false,
11841185
)?);
11851186

1186-
Ok((
1187-
scans,
1188-
Arc::new(SparkPlan::new(
1189-
spark_plan.plan_id,
1190-
join,
1191-
vec![
1192-
Arc::clone(&join_params.left),
1193-
Arc::clone(&join_params.right),
1194-
],
1195-
)),
1196-
))
1187+
if join.filter.is_some() {
1188+
// SMJ with join filter produces lots of tiny batches
1189+
let coalesce_batches: Arc<dyn ExecutionPlan> =
1190+
Arc::new(CoalesceBatchesExec::new(join.clone(), 8192));
1191+
Ok((
1192+
scans,
1193+
Arc::new(SparkPlan::new_with_additional(
1194+
spark_plan.plan_id,
1195+
coalesce_batches,
1196+
vec![
1197+
Arc::clone(&join_params.left),
1198+
Arc::clone(&join_params.right),
1199+
],
1200+
vec![join],
1201+
)),
1202+
))
1203+
} else {
1204+
Ok((
1205+
scans,
1206+
Arc::new(SparkPlan::new(
1207+
spark_plan.plan_id,
1208+
join,
1209+
vec![
1210+
Arc::clone(&join_params.left),
1211+
Arc::clone(&join_params.right),
1212+
],
1213+
)),
1214+
))
1215+
}
11971216
}
11981217
OpStruct::HashJoin(join) => {
11991218
let (join_params, scans) = self.parse_join_parameters(

0 commit comments

Comments
 (0)