-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: emitting partial join results in HashJoinStream
#8020
Conversation
I will review it soon, can you share the performance changes with https://github.com/apache/arrow-datafusion/tree/main/benchmarks |
a487a0e
to
5acdd6d
Compare
@metesynnada, thanks! Results for tpch_mem (10 iterations, scale=1) and tpch (5 iterations, scale=10) are
I guess before marking this PR as "ready for review" I'll add some unit tests for hash join and investigate reasons of slowing down q18 and q21 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @korowa and @metesynnada
I am also running the performance benchmarks on a GCP machine and I will post the results here when they are ready
I wonder if it would be possible to encapsulate some of these functions in a struct
, something like
struct JoinOutputBuilder {
...
}
That might allow a more efficient encoding of the current join state rather than having to pass around Range<usize>
perhaps 🤔
0 0 3 0 | ||
0 0 6 0 | ||
0 0 20 0 | ||
1 3 95 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did this output change? is it because the order on t2.a is a tie?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, yes -- this query result ordered by only t2 with random order on t1, and current behaviour for indices-mathching function is to iterate over inverted probe-side indices and attach build-side indices to them (HashMap + Vector data structure for also emits build-side indices in reverse order), and after matching whole probe-side side, resulting arrays inverted again -- it allows to return right-left side indices in their natural order.
In case of partial output -- I can't see any other option besides iterating probe-side naturally (otherwise the order of record would be broken as there is no "full batch" anymore to re-sort it), but in the same time build-side is stored in same data structure with reverse order.
So, it's a side effect -- hash join still maintains probe-side input order, but not for build-side anymore (guess it can potentially be achieved by tweaking collect_build_side
function) -- that's why t1 order in this query result is inverted now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the build side order is important -- maybe we should just update the test so it has a well defined deterministic output 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe preserving the order of the build side is important for further aggregations. We can avoid unnecessary sorting by leveraging the build-side lexical order. This pattern is common in Datafusion and warrants investing in order preservation.
What is the cost of maintaining the build side order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that build-side order is not that important (at least for majority of cases -- anyway it will be affected by probe-side ordering), but on the other side it's reasonable not to break current behaviour.
At first glance, the cost is increase of join_build_time
caused by switching current JoinHashMap
building process from LIFO (last value stored in map) to FIFO (first value stored in map) -- it'll probably require more traversals over JoinHashMap.next
to place all the elements, and affect joins with high share of non-unique values on build side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@metesynnada, my bad, didn't notice those special cases in calculate_join_output_ordering
for inner joins.
I guess, I'll make some research on this one and ideas related to it.
Thank you!
Thank you @alamb , but I don't think this PR worth benchmarking it -- after running current unit tests with different batch sizes, I've found that arguments for |
Thank you . this particular issue has become a higher priority for us at InfluxData (because we are sometimes hitting it internally). I may spend some time working on it myself this upcoming week (e.g. maybe encapsulating the join output generation into a struct as a first step 🤔 ) |
cba8b0f
to
447180d
Compare
447180d
to
5bdfe3f
Compare
@alamb, got it! Anyway, current version works as expected, so I'll mark it as ready for review. Regarding logic / encapsulation -- I've moved all the index-tracking work to |
HashJoinStream
HashJoinStream
Thanks @korowa -- I'll review this PR later today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @korowa . I reviewed the code and tests carefully and it is looking really nice to me.
I left some style suggestions, but I don't think anything is required prior to merge. However, given I am not as familiar with the join code, it would be awesome if either @liukun4515 @mingmwang or @Dandandan could also give this PR a review to ensure we aren't missing something
I am running the tpch benchmark tests now to confirm it doesn't represent any regression in performance
I am also testing this against our internal reproducer to see if I can verify if this PR fixes the issue we were seeing
Again, really nice work
0 0 3 0 | ||
0 0 6 0 | ||
0 0 20 0 | ||
1 3 95 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the build side order is important -- maybe we should just update the test so it has a well defined deterministic output 🤔
|
||
#[template] | ||
#[rstest] | ||
fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice way to test the logic
@@ -747,6 +753,97 @@ where | |||
Ok(()) | |||
} | |||
|
|||
// State for storing left/right side indices used for partial batch output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as a follow on PR we can move this structure into its own module (perhaps something like datafusion/physical-plan/src/joins/join_output.rs
)
(0..row_count) | ||
.filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32)) | ||
(range) | ||
.filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as u32)) | ||
.collect::<UInt32Array>() | ||
} | ||
|
||
/// Get unmatched and deduplicated indices | ||
pub(crate) fn get_anti_u64_indices( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_anti_u64_indices
is always called with a range of 0..
-- what is the reason for changing it to take a Range<usize>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial intention was to keep function signatures uniform for get_anti_indices
and get_anti_u64_indices
, but (since they duplicate each other) now I've made get_anti_indices
a generic function -- so there is no need in u64-variation anymore.
(0..row_count) | ||
.filter_map(|idx| (bitmap.get_bit(idx)).then_some(idx as u32)) | ||
(range) | ||
.filter_map(|idx| (bitmap.get_bit(idx - offset)).then_some(idx as u32)) | ||
.collect::<UInt32Array>() | ||
} | ||
|
||
/// Get matched and deduplicated indices | ||
pub(crate) fn get_semi_u64_indices( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, get_semi_u64_indices
never seems to be called with a range that doesn't start with zero
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
matched_indices_updated: bool, | ||
// tracking last joined probe side index seen for further indices adjustment | ||
last_joined_probe_index: Option<usize>, | ||
// tracking last joined probe side index seen for further indices adjustment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is same as above -- can you clarify what the difference is between "last" and "prev"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment has been updated
prev_joined_probe_index: Option<usize>, | ||
} | ||
|
||
impl HashJoinStreamState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to represent a state machine like this in Rust is via an enum. The upside is that it makes it more explicit what fields are used in what states and what transitions are allowed
Something like this perhaps
enum HashJoinOutputState {
/// The hash table is still being built
Hashing,
/// The hash table has been built, and a batch of `probe_rows` has is being output
/// but nothing has been output yet
Begin {
probe_rows: usize
},
/// Have output up to `last_matched_indices`
Output {
// saved probe-build indices to resume matching from
last_matched_indices: Option<(usize, usize)>,
}
/// ....
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly advise to make this enum, this is a common approach in other executors as well.
Here are my benchmark results. My conclusion is that there is no significant performance change (the faster/slower is within the realm of noise). I will try running at a larger scale to see if anything shows up
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I have verified that this change fixes out problem upstream (memory no longer grows without bound).
Thank you again @korowa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there is room for improvement in the code. One suggestion is to handle state handling and output-producing mechanisms in common structs, which could make it easier to understand. Another idea is the explanatory ASCII diagram for future maintainers. Additionally, for SymmetricHashJoin, it is recommended not to use HashJoinStreamState inside the SHJ file. Instead, it would be cleaner to copy the previous code into SHJ and keep them separate.
0 0 3 0 | ||
0 0 6 0 | ||
0 0 20 0 | ||
1 3 95 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe preserving the order of the build side is important for further aggregations. We can avoid unnecessary sorting by leveraging the build-side lexical order. This pattern is common in Datafusion and warrants investing in order preservation.
What is the cost of maintaining the build side order?
prev_joined_probe_index: Option<usize>, | ||
} | ||
|
||
impl HashJoinStreamState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly advise to make this enum, this is a common approach in other executors as well.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@alamb @metesynnada thank you! I've addressed some comments, and, just to sum up, remaining ones (major ones) I'm going to work on:
|
a261e44
to
8f79fc6
Compare
I suppose this version to be ready for review. Could you, please, take a look when you have some time? Just in case -- tpch_mem results for this version are
|
Thanks @korowa - I will try to review this carefully tomorrow (though it is a holiday so I may not get to it until Tuesday) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @korowa -- I went through this PR carefully and it looks really nice to me. I had some small style / comment suggestions, but my major concern is the reported slowdown.
I am runnning the tpch_mem benchmarks myself now to see if I can reproduce your results, and I left some thoughts about potential places where the regression may have come from.
@metesynnada can you please review this PR again to ensure it preserves the invariants needed from SymmetricHashJoin? All the tests are passing so it seems to me like we should be good, but a second pair of eyes would probably be good
Also cc @Dandandan for your thoughts
Here are my benchmark results. They are similar but less pronounced as yours
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query ┃ main_base ┃ hash_join_batch_size ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1 │ 217.67ms │ 211.80ms │ no change │
│ QQuery 2 │ 48.00ms │ 47.94ms │ no change │
│ QQuery 3 │ 83.12ms │ 80.45ms │ no change │
│ QQuery 4 │ 76.42ms │ 77.78ms │ no change │
│ QQuery 5 │ 128.00ms │ 128.13ms │ no change │
│ QQuery 6 │ 17.59ms │ 18.28ms │ no change │
│ QQuery 7 │ 313.54ms │ 336.00ms │ 1.07x slower │
│ QQuery 8 │ 85.88ms │ 86.11ms │ no change │
│ QQuery 9 │ 129.79ms │ 131.18ms │ no change │
│ QQuery 10 │ 160.24ms │ 162.07ms │ no change │
│ QQuery 11 │ 36.02ms │ 35.37ms │ no change │
│ QQuery 12 │ 71.54ms │ 73.88ms │ no change │
│ QQuery 13 │ 82.46ms │ 87.87ms │ 1.07x slower │
│ QQuery 14 │ 27.55ms │ 27.73ms │ no change │
│ QQuery 15 │ 65.98ms │ 66.24ms │ no change │
│ QQuery 16 │ 47.58ms │ 47.02ms │ no change │
│ QQuery 17 │ 176.15ms │ 175.04ms │ no change │
│ QQuery 18 │ 446.95ms │ 487.56ms │ 1.09x slower │
│ QQuery 19 │ 64.93ms │ 63.96ms │ no change │
│ QQuery 20 │ 119.30ms │ 116.42ms │ no change │
│ QQuery 21 │ 367.59ms │ 369.21ms │ no change │
│ QQuery 22 │ 31.19ms │ 30.48ms │ no change │
└──────────────┴───────────┴──────────────────────┴──────────────┘
struct ProcessProbeBatchState { | ||
/// Current probe-side batch | ||
batch: RecordBatch, | ||
/// Matching offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this match? The build side offset that is matched?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated comment -- now it indicates that this attribute is used as a starting offset for lookups against join hashmap
|
||
matched_probe.as_slice_mut().reverse(); | ||
matched_build.as_slice_mut().reverse(); | ||
let mut hashes_buffer = vec![0; probe_batch.num_rows()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is one source of potential slowdown -- the hashes buffer is reallocated each time? Maybe we could change it to be reused as it was previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a proposal of how to do so: korowa#169
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable -- let me merge it, and check if we can go even further and create hashes after fetching probe batch (once per batch -- not per lookup iteration) -- if hashing is the major source of perf degradation -- it should help significantly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hash values precalculation still doesn't recover performance, guess I need a bit more time to investigate it. 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will plan to do some profiling today and report my findings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time profiling and it looks like the additional time is spent in get_matched_indices_with_limit_offset
(likely unsurprising). I wonder if there are some obvious wins there (maybe use / reuse Vecs rather than buffer builder to avoid so many reallocations ?)
Methodology
Comparing be361fd (which is the merge-base) to b631c1e
Methodology -- build like this
git checkout `git merge-base HEAD apache/main` # be361fdd8079a2f44da70f6af6e9d8eb3f7d0020
cargo build --profile release-nonlto --bin dfbench
cp target/release-nonlto/dfbench ~/Downloads/dfbench-be361fd
gh pr checkout https://github.com/apache/arrow-datafusion/pull/8020
cargo build --profile release-nonlto --bin dfbench
cp target/release-nonlto/dfbench ~/Downloads/dfbench-hash_join_batch_size
link the data
~/Downloads$ ln -s /Users/andrewlamb/Software/arrow-datafusion/benchmarks
Run query 18:
./dfbench-hash_join_batch_size tpch --iterations 5 --path /Users/andrewlamb/Software/arrow-datafusion/benchmarks/data/tpch_sf1 -m --format parquet --query 18
This PR
main
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After precalculating hashes (it was the minor cause of degradation ~10ms) and removing all iterators-related constructions from lookup function (this function indeed was the major cause of execution slowdown ~30-40ms), I was able to obtain following results for q18
Comparing master and hash_join_batch_size
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ master ┃ hash_join_batch_size ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 18 │ 508.30ms │ 510.62ms │ no change │
└──────────────┴──────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (master) │ 508.30ms │
│ Total Time (hash_join_batch_size) │ 510.62ms │
│ Average Time (master) │ 508.30ms │
│ Average Time (hash_join_batch_size) │ 510.62ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 1 │
└─────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ master ┃ hash_join_batch_size ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 18 │ 455.00ms │ 476.05ms │ no change │
└──────────────┴──────────┴──────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (master) │ 455.00ms │
│ Total Time (hash_join_batch_size) │ 476.05ms │
│ Average Time (master) │ 455.00ms │
│ Average Time (hash_join_batch_size) │ 476.05ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 1 │
└─────────────────────────────────────┴──────────┘
(with --iterations 50 --partitions 4 --query 18
) which shows ~20-30ms speedup of what I've seen before for tpch-mem, but I don't have exact explanation of it yet. If you have (or will have) any additional comments / ideas / thoughts, it would be great.
}); | ||
} | ||
// apply join filter if exists | ||
let (left_indices, right_indices) = if let Some(filter) = &self.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your changes to this code have made it easier to read. Thank you
)?; | ||
|
||
self.join_metrics.output_batches.add(1); | ||
self.join_metrics.output_rows.add(state.batch.num_rows()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the output rows be the number of rows in result
rather than the entire batch? If it is the entire batch the output rows will be counted multiple times I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It definitely should, thank you.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Reuse hashes buffer when emitting partial join results
let (initial_idx, initial_next_idx) = offset; | ||
'probe: for (row_idx, hash_value) in iter.skip(initial_idx) { | ||
let index = if initial_next_idx.is_some() && row_idx == initial_idx { | ||
// If `initial_next_idx` is zero, then input index has been processed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might combine this logic with next==0
in the loop below?
I forgot to review this PR even though it was on my shortlist, will review it tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the filter equality generates too much data (close to cross-join), you stop pulling data from the probe until calculating the probe results in a couple of steps, which is nice. I think this is well-thought PR, congrats, I bought the idea 😀.
-
You should put more effort into readability, particularly on
process_probe_batch
. You can divide the logic into some methods, there are too many things happening under a single method. -
Maybe also you can think about reducing the offset handling out of the state methods, it becomes hard to incrementally add features into join operators.
Other than that, I think the structural integrity of the algorithm is quite fascinating.
Not exactly about this PR, but In the collect_build_side
method, the build_timer
will restart if the left_data
feature returns PENDING
. I missed it in the previous PR. Perhaps we can open an issue.
@@ -665,6 +668,8 @@ impl ExecutionPlan for HashJoinExec { | |||
reservation, | |||
state: HashJoinStreamState::WaitBuildSide, | |||
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }), | |||
batch_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not comfortable selecting the limit as batch size since it does not consider memory. However, I am unsure of how to pick a heuristic value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose batch_size
is the best fit out of all settings DF has at this moment -- there is no such setting (AFAIK) as batch_size_bytes
/ anything like adaptive batching based on memory size, so I suppose this logic to be more or less aligned with other operators.
timer.done(); | ||
|
||
self.state = HashJoinStreamState::FetchProbeBatch; | ||
if probe_batch_scanned { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For state machine handling, please take a look at EagerJoinStream
trait. It encapsulates the whole build and probe data pulling and state management, which means state handling and join calculations (inside the implementors of the trait) are visibly separated. It makes the reading quite easy. You can follow a layered approach like EagerJoinStream
and SHJ. This means inside the method you are calculating joins, do not alter the state.
My plan for this PR is to stare at it a bit more under the profiler and try and get back some of the lost performance so we can feel better about merging it |
343b192
to
9e353e7
Compare
I reran the benchmarks any my conclusion is that now this branch shows no change in performance 🚀
Nice work @korowa |
Actually, I should say "really nice work" |
Hi @korowa, I think there is a good improvement here. Do you think that we can easily adapt the same mechanism into nested loop join? This mechanism should be there as well, do you think we can make it without code duplication? If so, we can merge this PR and open an issue for nested loop join. |
Good call @metesynnada -- I have filed #8952 to track this issue |
Thanks again everyone for their reviews and @korowa for sticking with this. It is both really appreciated and a really nice PR |
Which issue does this PR close?
Closes #7848.
Rationale for this change
Ability to produce intermediate output, without accumulating "inner" batch until complete probe-side batch is joined, will help to avoid unpredictable memory consumption by
HashJoinStream
(e.g in case of implicit cross-joins) and to process such queries.What changes are included in this PR?
JoinHashMap::get_matched_indices_with_limit_offset
-- method that allows to get limited amount of matched pairs from JoinHashMap along with starting point for next call of this method.ProcessProbeBatchState
required for tracking the progress of current batch processing -- the state itself mutated once per iteration.build_equal_condition_join_indices
is splitted intolookup_join_hashmap
(matching & verification of values behind hashes are equal) andapply_join_filter_to_indices
(filtering logic -- not a new function). BothHashJoinExec
andSymmetricHashJoinExec
have their own implementation of matching -- at this point, the difference between them seems to start being noticeable.adjust_indices_by_join_type
and subsequently called by it functions now acceptRange
argument in order to perform partial adjustment.Are these changes tested?
Tests for each type of join (both with and without filters) now run for multiple batch sizes + added tests for validation output/batch count for each join type and varying batch_size.
Additionally these modifications are covered by
join_fuzz
tests.Are there any user-facing changes?