Skip to content

Extend ColocatedJoinRule and BroadcastSmallSideRule to SortMergeJoinExec #1679

@wirybeaver

Description

@wirybeaver

Background

PR #1676 (tracking #1677) introduces ColocatedJoinRule (skip the shuffle when both join inputs are co-bucketed) and BroadcastSmallSideRule (promote Partitioned to CollectLeft when one side fits under a threshold). Both rules currently only match HashJoinExec.

PR #1651 made SortMergeJoinExec the default join in Ballista (issue #1648), because DataFusion's hash join has no spill support. Users now opt back into hash join via SET datafusion.optimizer.prefer_hash_join = true.

The combined effect: under default settings, neither of the rules from #1676 fire for the planned SortMergeJoinExec, so co-bucketed tables still get a network shuffle and small-side joins still get repartitioned.

Proposed work

  1. ColocatedJoinRule — extend the matcher in try_elide_join_repartitions to handle SortMergeJoinExec in addition to HashJoinExec. The hash-distribution-satisfaction logic (matching keys, equal or divisor-related bucket counts) is the same; only the rewrite step (stripping the RepartitionExec and/or wrapping a side in BucketSubPartitionExec) needs the new variant. Note: SortMergeJoinExec requires sorted inputs, so the SortExec above each input needs to be preserved or re-derived after the rewrite — verify it doesn't get lost when the underlying source already advertises a compatible sort order.
  2. BroadcastSmallSideRule — broadcast doesn't apply to sort-merge in the same way (no CollectLeft mode for SortMergeJoinExec). Instead, when one side is small enough to fit, consider lowering to HashJoinExec(CollectLeft) directly — which means a join-mode swap, not just a partition-mode swap. Needs a small spike to confirm this is sound for sort-merge inputs.
  3. Tests — add e2e snapshot cases mirroring the three in colocated_join_e2e.rs (matching → no exchange; divisor → BucketSubPartitionExec; unbucketed control), but with the default prefer_hash_join = false so the planned join is SortMergeJoinExec.

Why this matters

Without this extension, the optimizations from #1676 are effectively gated behind a non-default config (prefer_hash_join = true). Users running Ballista with default settings against pre-bucketed tables won't benefit from colocation or small-side broadcast.

References

🤖 Generated with Claude Code

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions