Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: modified JoinHashMap build order for HashJoinStream #8658

Merged
merged 6 commits into from
Dec 30, 2023

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Dec 26, 2023

Which issue does this PR close?

Closes #8130.

Rationale for this change

The least harmful way of allowing HashJoinStream to process probe-side batch records in original order -- is to adjust build-side input to produce FIFO hash map as a result of update_hash, using current data structure without any modifications, and this can be achieved by reverse iteration over build-side input.

What changes are included in this PR?

  • collect_left_input iterates over collected batches in reverse order
  • update_hash and build_equal_condition_join_indices now accept fifo_hashmap argument, which determines the iteration order for input batches (SymmetricsHashJoin execution has not changed)
  • default implemenations for update_from_iter & get_matched_indices in JoinHashMapType trait -- to unload join helper functions from HashMap-specific code

Are these changes tested?

assert_batches_sorted_eq replaced with assert_batches_eq for Inner / RightSemi / RightAnti non-partitioned join test cases to validate that join output preserves input(s) order.

Are there any user-facing changes?

No.

@korowa korowa changed the title feat: modified JoinHashMap build order for HashJoinStream refactor: modified JoinHashMap build order for HashJoinStream Dec 26, 2023
@metesynnada
Copy link
Contributor

metesynnada commented Dec 28, 2023

Hi @korowa, this is a great extension. I suggest adding explanations in the code base, including docstrings for the mechanism and updating the docstring of the hash join. You can use https://textik.com, a great tool for drawing diagrams in docstring.

I have another question regarding keeping the "head" inside the JoinHashMap next array. I've noticed that with each new row addition, the previous row indices need to be scanned, making the addition complexity linear instead of constant. Have you run performance benchmarks on low cardinality key columns to see how this affects performance?

@korowa
Copy link
Contributor Author

korowa commented Dec 28, 2023

@metesynnada thank you!

Regarding explanations -- I've added extended explanation of how build phase works in HashJoinExec docstring -- hope with this description of "the whole picture", the other modified / added comments will make more sense.

Regarding performance -- benchmark results are

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃   master ┃ hash_join_build_order ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 156.09ms │              155.86ms │     no change │
│ QQuery 2     │  36.49ms │               36.30ms │     no change │
│ QQuery 3     │  70.99ms │               71.59ms │     no change │
│ QQuery 4     │  61.01ms │               58.94ms │     no change │
│ QQuery 5     │ 104.64ms │              101.76ms │     no change │
│ QQuery 6     │  16.71ms │               16.67ms │     no change │
│ QQuery 7     │ 274.75ms │              283.66ms │     no change │
│ QQuery 8     │  65.54ms │               65.67ms │     no change │
│ QQuery 9     │ 103.82ms │              104.10ms │     no change │
│ QQuery 10    │ 123.52ms │              123.01ms │     no change │
│ QQuery 11    │  28.87ms │               27.55ms │     no change │
│ QQuery 12    │  50.39ms │               52.08ms │     no change │
│ QQuery 13    │  59.11ms │               58.02ms │     no change │
│ QQuery 14    │  21.04ms │               18.94ms │ +1.11x faster │
│ QQuery 15    │  47.09ms │               46.57ms │     no change │
│ QQuery 16    │  32.32ms │               33.87ms │     no change │
│ QQuery 17    │ 150.53ms │              140.37ms │ +1.07x faster │
│ QQuery 18    │ 324.83ms │              322.20ms │     no change │
│ QQuery 19    │  46.00ms │               45.74ms │     no change │
│ QQuery 20    │  83.35ms │               83.45ms │     no change │
│ QQuery 21    │ 261.97ms │              265.00ms │     no change │
│ QQuery 22    │  28.01ms │               25.16ms │ +1.11x faster │
└──────────────┴──────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (master)                  │ 2147.08ms │
│ Total Time (hash_join_build_order)   │ 2136.50ms │
│ Average Time (master)                │   97.59ms │
│ Average Time (hash_join_build_order) │   97.11ms │
│ Queries Faster                       │         3 │
│ Queries Slower                       │         0 │
│ Queries with No Change               │        19 │
└──────────────────────────────────────┴───────────┘

(faster queries probably are no more than outliers)

I've noticed that with each new row addition, the previous row indices need to be scanned, making the addition complexity linear instead of constant.

Could you, please, recheck or point the LOC where scanning takes place? If I got it right -- the problem should be in JoinHashMap::update_from_iter, but the worst case for insertion there seems to be

  • obtain currently stored index index in hashmap for hash value
  • place it at specific position in next vector
  • update hashmap with a new index

which should not produce linear complexity 🤔 (at least it definitely shouldn't perform worse than before)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @korowa -- this is a really nice refactor. I read it carefully and it looks very nice to me.

cc @Dandandan and @liukun4515 for your interest

@@ -156,8 +155,48 @@ impl JoinLeftData {
///
/// Execution proceeds in 2 stages:
///
/// 1. the **build phase** where a hash table is created from the tuples of the
/// build side.
/// 1. the **build phase** creates a hash table from the tuples of the build side,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for this

@@ -1558,7 +1573,9 @@ mod tests {
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);

// Inner join output is expected to preserve both inputs order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

datafusion/physical-plan/src/joins/hash_join.rs Outdated Show resolved Hide resolved
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also is neat to see it keep the same order as the input

@@ -151,6 +151,82 @@ pub trait JoinHashMapType {
fn get_map(&self) -> &RawTable<(u64, u64)>;
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JoinHashMap and this trait are getting big enough maybe to warrant moving to its own module (in a separate PR, perhaps)

Copy link
Contributor Author

@korowa korowa Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe partial join output will add some more logic to JoinHashMap and make separate module reasonable (or even required) at that stage

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I misunderstood an explanation in the related issue. This shouldn't degrade the performance, nicely done. However, are the JoinHashMap struct comments relevant after this change?

@korowa
Copy link
Contributor Author

korowa commented Dec 29, 2023

However, are the JoinHashMap struct comments relevant after this change?

@metesynnada, yes, the PR is only related to the JoinHashMap input -- all its internals remains as they were before so all docs and comments for both JoinHashMap and moved pieces of code are still relevant. The only comment modification is related to missing offset argument in update_from_iter as its caller responsibility to provide correct row indices in iterator.

@korowa
Copy link
Contributor Author

korowa commented Dec 29, 2023

Failed tests look like related to chrono-tz 0.8.5 + eggert/tz@1d0ae80 🤔

@korowa
Copy link
Contributor Author

korowa commented Dec 29, 2023

Pushed separate PR cause new chrono-tz release will affect test run on any branch.

@alamb
Copy link
Contributor

alamb commented Dec 30, 2023

Thank you @korowa -- both for this PR and for debugging #8677

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FIFO JoinHashMap for HashJoin
3 participants