Description
Is your feature request related to a problem or challenge?
The current Nested Loop Join implementation follows this simplified logic:
- Buffer the Build Side: All data from the left (build) side of the join is collected and held in memory.
- Iterate the Probe Side: The operator iterates through the right (probe) side, processing one RecordBatch at a time.
- Process Batch Pairs: For each buffered RecordBatch from the left side and the incoming RecordBatch from the right side, the core join logic is executed:
- Creates a Cartesian product of two input batches and apply filter -> (left_side_indices, right_side_indices)
- adjust_indices_by_join_type
build_batch_from_indices(left_side_indices, right_side_indices)
It has following problems
- It put all indices of Cartesian product of two input batches in memory with length of
left_batch.num_rows() * right_batch.num_rows()
- It may create extreme large record_batch at a time
- In 3.i if query has filter, it will call
build_batch_from_indices(left_side_indices, right_side_indices) -> RecordBatch
. It will returnleft_side_indices.len()*right_side_indices().len()
rows - In 3.iii it will return at most
left_side_indices.len()*right_side_indices().len()
rows
- In 3.i if query has filter, it will call
I add some log to show that
> select t1.value from range(40960) t1 join range(8192) t2 on t1.value + t2.value < t1.value * t2.value;
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.num_rows() = 335544320
[datafusion/physical-plan/src/joins/utils.rs:864:5] intermediate_batch.get_array_memory_size() = 5368709312
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.num_rows() = 335446019
[datafusion/physical-plan/src/joins/nested_loop_join.rs:906:17] result.get_array_memory_size() = 2683568248
Describe the solution you'd like
-
Process the Cartesian Product Incrementally. At any given time, it will only generate the Cartesian product for a slice of the left batch against a slice of the right batch
- Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a
probe_side_bitmap
must be maintained. This bitmap is populated as each chunk is processed to track which rows from the right side have found a match. After all chunks have been evaluated, this bitmap is used to generate the final output for the unmatched right-side rows.
- Special Handling for Right Joins: For Right, RightSemi, and RightAnti joins, a
-
Limit
intermediate_batch
Size During Filtering: When applying the join filter inapply_join_filter_to_indices
, avoid creating a single, massiveintermediate_batch
for evaluation. Instead, process the indices in batches:- Iteratively build smaller intermediate batches by calling build_batch_from_indices on slices of the input indices (e.g., build_indices.slice(i, N) and probe_indices.slice(i, N)).
- Apply the filter expression to each small intermediate batch.
- Concatenate the filtered results from each chunk to produce the final set of matched indices.
-
Yield Partial Batches on Demand: On each call, the stream will use its cursor to process the next chunk of indices (e.g., from i to i + N). It will then build and return a small RecordBatch from only that slice
(we can do 2, 3 first)
Describe alternatives you've considered
No response
Additional context
No response