Open
Description
Is your feature request related to a problem or challenge?
The common NLJ implementation consumes constant memory. However, DataFusion's implementation is optimized for execution time, which requires it to buffer all input data on one side, making it possible to fail under memory-constrained cases.
The following pseudocode explains the current implementation:
Normal NLJ
for l_batch in nlj.left.get_next_batch():
for r_batch in nlj.right.get_next_batch():
let matched_entries = match(l_batch, r_batch);
output(matched_entries)
nlj.right.reset(); // Let right side of the join restart and scan again
DataFusion's NLJ implementation
for l_batch in nlj.left.get_next_batch():
buffered_left_batches.push(l_batch);
for r_batch in nlj.right.get_next_batch():
let matched_entries = match(buffered_left_batches, r_batch);
output(matched_batches);
Related code:
Describe the solution you'd like
When the memory limit is reached when buffering the left side input, start probing the right side. After it's done, collect the remaining left side entries, and let the right side scan from the beginning and probe again, until finished.
Describe alternatives you've considered
No response
Additional context
No response