Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322

Merged
merged 49 commits into from
Mar 1, 2023
Merged

Support for Sliding Windows Joins with Symmetric Hash Join (SHJ) #5322

merged 49 commits into from
Mar 1, 2023

Conversation

metesynnada
Copy link
Contributor

Which issue does this PR close?

Closes #5321.

This is a large pull request, with over 5.4k additions. However, we made sure to include detailed comments and extensive testing, and the changes made break down as follows:

  • 308 lines changed in Cargo.lock.
  • 2,300 lines of additional testing code.
  • Over 1,000 lines of new or improved comments.

Rationale for this change

Symmetric Hash Join (SHJ) extends the join capabilities of Datafusion by supporting filter expressions with order guarantees efficiently. This use case arises commonly in time-series contexts; e.g. use cases involving sliding windows. While ordinary hash join remains the preferable option when both sources are finite, the join type can be changed to SHJ using a PipelineFixer sub-rule when both sources are unbounded.

Let’s see how important this feature is: In a typical stream processing library like Apache Flink or Apache Spark, the join operation can be performed using watermarks. Let's examine a query taken from the Apache Spark docstring:

SELECT * FROM left_table, right_table
WHERE
    left_key = right_key AND
    left_time > right_time - INTERVAL 12 MINUTES AND
    left_time < right_time + INTERVAL 2 HOUR

In this query (), say each join side has a time column, named "left_time" and "right_time", and there is a join condition "left_time > right_time - 8 min". While processing, say the watermark on the right input is "12:34". This means that from henceforth, only right inputs rows with "right_time > 12:34" will be processed, and any older rows will be considered as "too late" and therefore dropped. Then, the left side buffer only needs to keep rows where "left_time > right_time - 8 min > 12:34 - 8m > 12:26". That is, the left state watermark is 12:26, and any rows older than that can be dropped from the state. In other words, the operator will discard all states where the timestamp in state value (input rows) < state watermark.

Actually, this is part of the picture, not the whole. In theory, range-based pruning can be done with any sorted field (not just the watermark field) and with any arbitrary join filter condition that is amenable to this type of data pruning. However, Apache Spark overfits to timestamps and associates the pruning operation with a watermark. Let’s follow a different approach and examine the following query from a more general, first-principles perspective:

SELECT * FROM left_table, right_table
WHERE
  left_key = right_key AND
  left_sorted > right_sorted - 3 AND
  left_sorted < right_sorted + 10

If sort orders of the two columns (left_sorted and right_sorted) are ascending, and the join condition is left_sorted > right_sorted - 3, and the latest value on the right input is 1234, then the left side buffer only has to keep rows where left_sorted > 1231 and any rows coming before this boundary can be dropped from the buffer. Note that this example is in no way specific; similar scenarios can manifest with a variety of orderings and join filter expressions.

Please refer to the blog post for more information.

What changes are included in this PR?

The main features included in this PR are:

  • An initial library for interval arithmetic, which includes basic arithmetic operations (addition and subtraction) and comparison operations (greater than and less than) for integer types, and supports the logical conjunction operator.
  • An API for performing interval calculations, which can be used for other purposes, such as range pruning in Parquet. Within the context of this PR, we use this functionality to calculate filter expression bounds for pruning purposes.
  • A constraint propagation module to construct expression DAGs from PhysicalExpr trees and update column bounds efficiently for data pruning purposes.
  • An initial implementation of SHJ, which is limited to the partitioned mode and does not yet have full support for output order information.
  • A new sub rule for PipelineFixer to utilize SHJ instead of ordinary Hash Join when joining two (unbounded) streams.

In order to have a PR with a manageable size, some features have been excluded for now, but will be added in the future. These include:

  • Improved support for interval arithmetic, such as support for open/closed intervals, multiply/divide operations, additional comparison and logical operators, floating point numbers, and time intervals.
  • Improved constant propagation, including the ability to determine monotonicity properties of complex PhysicalExprs.
  • An improved SHJ algorithm, including support for collect left/right/all modes, intermediate buffers for complex expressions, and an improved output ordering flag.

Performance Gains

SHJ not only makes sliding windows pipeline-friendly, it improves execution throughput even in non-streaming cases in many scenarios, thanks to data pruning. Data pruning results in lower memory requirements, and higher cache efficiency, and opens the door to executing joins entirely in memory for large datasets with short sliding window join filters. You can find a detailed performance analysis for various scenarios here.

Are these changes tested?

Yes, deterministic and fuzzy unit tests are added.

Are there any user-facing changes?

No backwards-incompatible changes.

This change simply creates new use cases in streaming applications. Below, we provide several usage patterns we may start to see more often in the wild, given that we have stream join capabilities:

  • Marking sources infinite and provide schema:
let fifo_options = CsvReadOptions::new()
    .schema(schema.as_ref())
    .has_header(false) // Optional
    .mark_infinite(true);
  • Specifying ordering for columns where an a-priori order is known:
let file_sort_order = [datafusion_expr::col("col_name")]
    .into_iter()
    .map(|e| {
        let ascending = true;
        let nulls_first = false;
        e.sort(ascending, nulls_first)
    })
    .collect::<Vec<_>>();

More examples on table registration can be found in the subroutines we employ in the unbounded_file_with_symmetric_join test under datafusion/core/tests/fifo.rs.

On the query side, one will be able to execute a query like

SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left_table as t1 
FULL JOIN right_table as t2 
ON t1.a2 = t2.a2 
	AND t1.a1 > t2.a1 + 3 
	AND t1.a1 < t2.a1 + 10

in a streaming fashion, so we may see some new usage patterns arise at the query level too.

metesynnada and others added 30 commits January 20, 2023 17:02
- Utilize estimate_bounds without propagation for better API.
- Remove coupling between node_index & PhysicalExpr pairing and graph.
- Better commenting on symmetric hash join while using graph
@github-actions github-actions bot removed the core Core DataFusion crate label Feb 28, 2023
@metesynnada metesynnada reopened this Feb 28, 2023
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions labels Feb 28, 2023
@metesynnada metesynnada marked this pull request as ready for review February 28, 2023 11:00
@metesynnada metesynnada requested review from alamb, mingmwang and ozankabak and removed request for alamb and mingmwang February 28, 2023 11:00
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metesynnada's last changes addresses all the feedbacks we got in the initial wave of reviews. As discussed, there will be follow-on PRs coming to improve/extend the functionality 🚀

DataType::UInt16 => ScalarValue::UInt16(Some(0)),
DataType::UInt32 => ScalarValue::UInt32(Some(0)),
DataType::UInt64 => ScalarValue::UInt64(Some(0)),
DataType::Float32 => ScalarValue::UInt64(Some(0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be float32 and float64?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's a typo -- just sent a commit to fix it as well as the new merge conflicts.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is ready to merge -- @mingmwang if you agree perhaps you can click the button?

@alamb
Copy link
Contributor

alamb commented Feb 28, 2023

Thanks a lot for all the work @metesynnada and @ozankabak

@ozankabak
Copy link
Contributor

I resolved the latest conflicts, @alamb this is good to go

@alamb alamb merged commit 3c1e4c0 into apache:main Mar 1, 2023
@alamb
Copy link
Contributor

alamb commented Mar 1, 2023

Thanks again everyone

@ursabot
Copy link

ursabot commented Mar 1, 2023

Benchmark runs are scheduled for baseline = 03fbf9f and contender = 3c1e4c0. 3c1e4c0 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb
Copy link
Contributor

alamb commented Mar 9, 2023

Filed #5535 to track the work to unify interval analysis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Sliding Windows Joins with Symmetric Hash Join (SHJ)
6 participants