Background
PR #1676 (tracking #1677) introduces ColocatedJoinRule (skip the shuffle when both join inputs are co-bucketed) and BroadcastSmallSideRule (promote Partitioned to CollectLeft when one side fits under a threshold). Both rules currently only match HashJoinExec.
PR #1651 made SortMergeJoinExec the default join in Ballista (issue #1648), because DataFusion's hash join has no spill support. Users now opt back into hash join via SET datafusion.optimizer.prefer_hash_join = true.
The combined effect: under default settings, neither of the rules from #1676 fire for the planned SortMergeJoinExec, so co-bucketed tables still get a network shuffle and small-side joins still get repartitioned.
Proposed work
ColocatedJoinRule — extend the matcher in try_elide_join_repartitions to handle SortMergeJoinExec in addition to HashJoinExec. The hash-distribution-satisfaction logic (matching keys, equal or divisor-related bucket counts) is the same; only the rewrite step (stripping the RepartitionExec and/or wrapping a side in BucketSubPartitionExec) needs the new variant. Note: SortMergeJoinExec requires sorted inputs, so the SortExec above each input needs to be preserved or re-derived after the rewrite — verify it doesn't get lost when the underlying source already advertises a compatible sort order.
BroadcastSmallSideRule — broadcast doesn't apply to sort-merge in the same way (no CollectLeft mode for SortMergeJoinExec). Instead, when one side is small enough to fit, consider lowering to HashJoinExec(CollectLeft) directly — which means a join-mode swap, not just a partition-mode swap. Needs a small spike to confirm this is sound for sort-merge inputs.
- Tests — add e2e snapshot cases mirroring the three in
colocated_join_e2e.rs (matching → no exchange; divisor → BucketSubPartitionExec; unbucketed control), but with the default prefer_hash_join = false so the planned join is SortMergeJoinExec.
Why this matters
Without this extension, the optimizations from #1676 are effectively gated behind a non-default config (prefer_hash_join = true). Users running Ballista with default settings against pre-bucketed tables won't benefit from colocation or small-side broadcast.
References
🤖 Generated with Claude Code
Background
PR #1676 (tracking #1677) introduces
ColocatedJoinRule(skip the shuffle when both join inputs are co-bucketed) andBroadcastSmallSideRule(promotePartitionedtoCollectLeftwhen one side fits under a threshold). Both rules currently only matchHashJoinExec.PR #1651 made
SortMergeJoinExecthe default join in Ballista (issue #1648), because DataFusion's hash join has no spill support. Users now opt back into hash join viaSET datafusion.optimizer.prefer_hash_join = true.The combined effect: under default settings, neither of the rules from #1676 fire for the planned
SortMergeJoinExec, so co-bucketed tables still get a network shuffle and small-side joins still get repartitioned.Proposed work
ColocatedJoinRule— extend the matcher intry_elide_join_repartitionsto handleSortMergeJoinExecin addition toHashJoinExec. The hash-distribution-satisfaction logic (matching keys, equal or divisor-related bucket counts) is the same; only the rewrite step (stripping theRepartitionExecand/or wrapping a side inBucketSubPartitionExec) needs the new variant. Note:SortMergeJoinExecrequires sorted inputs, so theSortExecabove each input needs to be preserved or re-derived after the rewrite — verify it doesn't get lost when the underlying source already advertises a compatible sort order.BroadcastSmallSideRule— broadcast doesn't apply to sort-merge in the same way (noCollectLeftmode forSortMergeJoinExec). Instead, when one side is small enough to fit, consider lowering toHashJoinExec(CollectLeft)directly — which means a join-mode swap, not just a partition-mode swap. Needs a small spike to confirm this is sound for sort-merge inputs.colocated_join_e2e.rs(matching → no exchange; divisor →BucketSubPartitionExec; unbucketed control), but with the defaultprefer_hash_join = falseso the planned join isSortMergeJoinExec.Why this matters
Without this extension, the optimizations from #1676 are effectively gated behind a non-default config (
prefer_hash_join = true). Users running Ballista with default settings against pre-bucketed tables won't benefit from colocation or small-side broadcast.References
🤖 Generated with Claude Code