Skip to content

Optimize NestedLoopJoinExec Memory Usage #16364

Open
@UBarney

Description

@UBarney

Is your feature request related to a problem or challenge?

The current Nested Loop Join implementation follows this simplified logic:

  1. Buffer the Build Side: All data from the left (build) side of the join is collected and held in memory.
  2. Iterate the Probe Side: The operator iterates through the right (probe) side, processing one RecordBatch at a time.
  3. Process Batch Pairs: For each buffered RecordBatch from the left side and the incoming RecordBatch from the right side, the core join logic is executed:
    1. Creates a Cartesian product of two input batches and apply filter -> (left_side_indices, right_side_indices)
    2. adjust_indices_by_join_type
    3. build_batch_from_indices(left_side_indices, right_side_indices)

It has following problems

  • It put all indices of Cartesian product of two input batches in memory with length of left_batch.num_rows() * right_batch.num_rows()
  • It may create extreme large record_batch at a time
    • In 3.i if query has filter, it will call build_batch_from_indices(left_side_indices, right_side_indices) -> RecordBatch. It will return left_side_indices.len()*right_side_indices().len() rows
    • In 3.iii it will return at most left_side_indices.len()*right_side_indices().len() rows

I add some log to show that

> select t1.value from range(40960) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value;
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.num_rows() = 335544320
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.get_array_memory_size() = 5368709312
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.num_rows() = 335446019
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.get_array_memory_size() = 2683568248

Describe the solution you'd like

  1. Process the Cartesian Product Incrementally. At any given time, it will only generate the Cartesian product for a slice of the left batch against a slice of the right batch

    1. Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a probe_side_bitmap must be maintained. This bitmap is populated as each chunk is processed to track which rows from the right side have found a match. After all chunks have been evaluated, this bitmap is used to generate the final output for the unmatched right-side rows.
  2. Limit intermediate_batch Size During Filtering: When applying the join filter in apply_join_filter_to_indices, avoid creating a single, massive intermediate_batch for evaluation. Instead, process the indices in batches:

    1. Iteratively build smaller intermediate batches by calling build_batch_from_indices on slices of the input indices (e.g., build_indices.slice(i, N) and probe_indices.slice(i, N)).
    2. Apply the filter expression to each small intermediate batch.
    3. Concatenate the filtered results from each chunk to produce the final set of matched indices.
  3. Yield Partial Batches on Demand: On each call, the stream will use its cursor to process the next chunk of indices (e.g., from i to i + N). It will then build and return a small RecordBatch from only that slice

(we can do 2, 3 first)

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions