-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322
Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322
Conversation
….com/synnada-ai/arrow-datafusion into feature/prunable-symmetric-hash-join
- Utilize estimate_bounds without propagation for better API. - Remove coupling between node_index & PhysicalExpr pairing and graph. - Better commenting on symmetric hash join while using graph
….com/synnada-ai/arrow-datafusion into feature/prunable-symmetric-hash-join
Also, cargo.lock update.
….com/synnada-ai/arrow-datafusion into feature/prunable-symmetric-hash-join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@metesynnada's last changes addresses all the feedbacks we got in the initial wave of reviews. As discussed, there will be follow-on PRs coming to improve/extend the functionality 🚀
datafusion/common/src/scalar.rs
Outdated
DataType::UInt16 => ScalarValue::UInt16(Some(0)), | ||
DataType::UInt32 => ScalarValue::UInt32(Some(0)), | ||
DataType::UInt64 => ScalarValue::UInt64(Some(0)), | ||
DataType::Float32 => ScalarValue::UInt64(Some(0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be float32 and float64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that's a typo -- just sent a commit to fix it as well as the new merge conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is ready to merge -- @mingmwang if you agree perhaps you can click the button?
Thanks a lot for all the work @metesynnada and @ozankabak |
I resolved the latest conflicts, @alamb this is good to go |
Thanks again everyone |
Benchmark runs are scheduled for baseline = 03fbf9f and contender = 3c1e4c0. 3c1e4c0 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Filed #5535 to track the work to unify interval analysis |
Which issue does this PR close?
Closes #5321.
This is a large pull request, with over 5.4k additions. However, we made sure to include detailed comments and extensive testing, and the changes made break down as follows:
Cargo.lock
.Rationale for this change
Symmetric Hash Join (SHJ) extends the join capabilities of Datafusion by supporting filter expressions with order guarantees efficiently. This use case arises commonly in time-series contexts; e.g. use cases involving sliding windows. While ordinary hash join remains the preferable option when both sources are finite, the join type can be changed to SHJ using a
PipelineFixer
sub-rule when both sources are unbounded.Let’s see how important this feature is: In a typical stream processing library like Apache Flink or Apache Spark, the join operation can be performed using watermarks. Let's examine a query taken from the Apache Spark docstring:
Actually, this is part of the picture, not the whole. In theory, range-based pruning can be done with any sorted field (not just the watermark field) and with any arbitrary join filter condition that is amenable to this type of data pruning. However, Apache Spark overfits to timestamps and associates the pruning operation with a watermark. Let’s follow a different approach and examine the following query from a more general, first-principles perspective:
If sort orders of the two columns (
left_sorted
andright_sorted
) are ascending, and the join condition isleft_sorted > right_sorted - 3
, and the latest value on the right input is 1234, then the left side buffer only has to keep rows whereleft_sorted > 1231
and any rows coming before this boundary can be dropped from the buffer. Note that this example is in no way specific; similar scenarios can manifest with a variety of orderings and join filter expressions.Please refer to the blog post for more information.
What changes are included in this PR?
The main features included in this PR are:
PhysicalExpr
trees and update column bounds efficiently for data pruning purposes.PipelineFixer
to utilize SHJ instead of ordinary Hash Join when joining two (unbounded) streams.In order to have a PR with a manageable size, some features have been excluded for now, but will be added in the future. These include:
PhysicalExpr
s.Performance Gains
SHJ not only makes sliding windows pipeline-friendly, it improves execution throughput even in non-streaming cases in many scenarios, thanks to data pruning. Data pruning results in lower memory requirements, and higher cache efficiency, and opens the door to executing joins entirely in memory for large datasets with short sliding window join filters. You can find a detailed performance analysis for various scenarios here.
Are these changes tested?
Yes, deterministic and fuzzy unit tests are added.
Are there any user-facing changes?
No backwards-incompatible changes.
This change simply creates new use cases in streaming applications. Below, we provide several usage patterns we may start to see more often in the wild, given that we have stream join capabilities:
More examples on table registration can be found in the subroutines we employ in the
unbounded_file_with_symmetric_join
test underdatafusion/core/tests/fifo.rs
.On the query side, one will be able to execute a query like
in a streaming fashion, so we may see some new usage patterns arise at the query level too.