Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build hash table while adding input rows for left semi and anti join #7066

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

liujiayi771
Copy link
Contributor

As discussed in the issue, during the hash build process, left semi join and anti join can deduplicate the input rows based on the join key. However, Velox's hash build addInput process adds all inputs to the RowContainer, which can result in significant memory wastage in certain scenarios, such as TPCDS Q14 and Q95. To address this, we can construct the hash table directly during data input and utilize the existing allowDuplicates parameter of the hashTable to remove duplicate data without storing it in the RowContainer. This process is similar to constructing a hash table in the hash aggregation process.

Due to Velox's hash build potentially having multiple drivers executing, in this scenario, duplicate data can only be removed for individual driver inputs. However, in the case of single-driver execution mode, it is possible to remove all duplicate data.

@netlify
Copy link

netlify bot commented Oct 16, 2023

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 92d0e67
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/66fe192ecee722000877f871

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Oct 16, 2023
@liujiayi771 liujiayi771 marked this pull request as draft October 16, 2023 06:17
@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 16, 2023

I use Gluten to perform testing, and I have observed significant reductions in peak memory usage and execution time for hash build in the case of a 1TB TPCDS dataset Q95.
create hash table after all inputs have been added to RowContainer:

stream input rows to hash table, and deduplicate the input rows:

@liujiayi771
Copy link
Contributor Author

Hi @mbasmanova. Do you have any suggestions for this optimization?

@liujiayi771 liujiayi771 force-pushed the left-semi branch 3 times, most recently from 893ad56 to efb2eea Compare October 16, 2023 10:07
@@ -751,7 +751,7 @@ class HashTable : public BaseHashTable {
// or distinct mode VectorHashers in a group by hash table. 0 for
// join build sides.
int32_t reservePct() const {
return isJoinBuild_ ? 0 : 50;
return (isJoinBuild_ && rows_->nextOffset()) ? 0 : 50;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an unrelated fix? Maybe extract it into a separate PR and explain what it does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there is no variable in the HashTable that can represent whether duplicate data is allowed. However, rows_->nextOffset() can be used to indicate that duplicate data is not allowed. For example, in the HashTable of a left semi join, rows_->nextOffset() is 0. Introducing a new variable allowDuplicates in HashTable here may make it clearer.

I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg. I'm not sure if my understanding of reservePct is correct, please correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg.

Makes sense. Let's make sure this code path is not activated when we are building hash table after seeing all the keys though.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Thank you for the optimization. Overall this makes sense to me. Just to clarify, my understand is the following.

Before this change, HashBuild operator first added all input data into RowContainer, then built a hash table over it. Now, HashBuild operator is building hash table while adding input rows. In case when HashBuild operator runs single-threaded there is no need to build hash table again after processing all input. In case when HashBuild operator runs multi-threaded it is still necessary to re-build the hash table after all input has been received an combined.

@liujiayi771
Copy link
Contributor Author

@mbasmanova Yes. For single-threaded scenarios, further optimization can be done to avoid rebuilding the hash table.

@liujiayi771 liujiayi771 changed the title Stream input rows to hash table when addInput for left semi and anti join Build hash table while adding input rows for left semi and anti join Oct 16, 2023
@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 19, 2023

Hi @mbasmanova. During the process of fixing the UT, I discovered that regardless of whether it is a single-threaded hash build or not, it is necessary to call table_->prepareJoinTable in the finishHashBuild to perform a rehash. Although the hash table is already correct and usable at this point, there may still be some rows that have not been added to uniqueValues_ in VectorHasher. This may result in the generated dynamic filter not including all the values. Therefore, during the final rehash, it will execute table->analyze to add all rows to uniqueValues_.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Oct 19, 2023

Hi @xiaoxmeng.
I'm not very familiar with arbitration, could you give me some advice? Can the current modifications to the hash table build process be compatible with the current arbitration for HashBuild?

In addition, I made some modifications in SharedArbitrationTest in order to make the test pass. For example, the current anti join deduplicates and does not store non-join key columns, which reduces memory usage. Therefore, we may need to add a seed to ensure that newVector in SharedArbitrationTest generates a different vector each time it is called.

Additionally, it may be necessary to reduce the joinMemoryUsage in the tests to trigger the execution of "fakeAllocation". And I have changed the join key to the varchar type, allowing the fuzzer to generate more different join keys and occupy more memory. I'm not sure if what I'm doing is reasonable.

@liujiayi771 liujiayi771 marked this pull request as ready for review October 20, 2023 01:52
@liujiayi771
Copy link
Contributor Author

cc @Yohahaha

@liujiayi771
Copy link
Contributor Author

@xiaoxmeng Could you help review this change?

@xiaodouchen
Copy link
Contributor

@liujiayi771 @mbasmanova Is there any latest progress on this issue? I am very interested in it.

I've been exploring the implementation details and have a question regarding the efficiency of the hash table construction within the addInput method. In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table.

Additionally, for left semi join and anti join, is there an opportunity to only optimize by storing only the join keys in the RowContainer?

@mbasmanova
Copy link
Contributor

@xiaodouchen

or left semi join and anti join, is there an opportunity to only optimize by storing only the join keys in the RowContainer?

I believe this should be done in the optimizer. The query plan passed to Velox should not have non-join keys on the build side.

@mbasmanova
Copy link
Contributor

In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table.

@xiaodouchen That's a good point. Perhaps, this logic can be adaptive. If after processing some data it sees low duplication rate, it stops building a hash table.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Dec 13, 2023

@xiaodouchen @mbasmanova If the data has a very low duplication rate, it will indeed be time-consuming when rehashing at the end. A similar approach to the early abandonment in 'agg' can be adopted.

Regarding not storing non-join keys in the row container, I understand that this has already been implemented. You can see it here, but the actual situation is that these columns will not be passed down. The optimizer will pass only the join key columns through a project.

@xiaodouchen
Copy link
Contributor

@liujiayi771 @mbasmanova Thank you for your reply!

A similar approach to the early abandonment in 'agg' can be adopted.

Is there a planned roadmap to support this? Besides, what's the plan for this pr?

@liujiayi771 I noticed that TPCH includes some semi-join cases. Have you conducted any performance tests with TPCH to evaluate the impact of your changes? I'm particularly interested in knowing the performance improvements in TPCH.

@liujiayi771
Copy link
Contributor Author

@xiaodouchen The main purpose of this modification is to reduce memory consumption caused by duplicate data, and the performance improvement also primarily comes from the reduction in data volume. I have tested 10T TPCDS q14 and q95 under Spark + Velox and there is a very significant performance improvement. Memory usage has dropped by more than a hundredfold for join operator, and execution speed has increased by more than 20%.

In the Velox's own TPCH benchmark, due to the smaller data volume, the improvement in speed may not be as noticeable. I can run a TPCH test on my cluster using Spark in the next couple of days to see the memory usage and speed improvement of the semi join, and I will reply with the results afterwards. I will look into optimizations based on data duplication rates in the next two weeks.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Dec 21, 2023

Hi @xiaodouchen, I conducted a simple test on TPCH 1TB q20, q21, and q22. The optimizations made in this PR did not have a significant impact on these three queries. This could be due to the duplication rate of the build-side data. There was not much improvement in terms of execution time for these queries; the time remained mostly the same. The memory usage for q20 and q21 was also similar, with only q22 showing a noticeable decrease in memory usage for left semi-join.

image image

@zhli1142015
Copy link
Contributor

Any progress for this PR?
Thanks.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Mar 22, 2024

Any progress for this PR? Thanks.

@zhli1142015 We've been using this PR internally, and the results have been quite positive. It's been a while since this PR was last updated, but I can continue to develop the dynamic enabling based on the duplication rate. Have you tested this pull request? Do you have any suggestions?

@zhli1142015
Copy link
Contributor

zhli1142015 commented Mar 22, 2024

Any progress for this PR? Thanks.

@zhli1142015 We've been using this PR internally, and the results have been quite positive. It's been a while since this PR was last updated, but I can continue to develop the dynamic enabling based on the duplication rate. Have you tested this pull request? Do you have any suggestions?

Thanks for your update. We observed a silimar issue, maybe we can test this internally also. @ayushi-agarwal

Thanks

@liujiayi771 liujiayi771 force-pushed the left-semi branch 3 times, most recently from 84af0e3 to d0853df Compare April 8, 2024 12:40
@liujiayi771 liujiayi771 force-pushed the left-semi branch 4 times, most recently from 2df6f68 to c7770f3 Compare April 24, 2024 09:42
@liujiayi771
Copy link
Contributor Author

Hi @mbasmanova @xiaoxmeng @Yuhta. I have recently spent some time updating this PR to include support for falling back to the original mode (only adding rows to the row container in addInput) when the duplicate ratio is low. I have also adapted some logic for join spill.

This PR addresses the issue of not needing to save duplicate data to the row container for left semi and anti join when the join key has a high duplicate rate in the build side input. Storing these duplicate data can consume a significant amount of memory, especially in TPCDS Q95. Due to the current design of Velox, which first adds input and then builds the hash table, it is not possible to deduplicate in the addInput phase. The current PR supports directly using the same mode as building the hash table in the agg, meaning building the hash table and deduplicating upon input, so the data in the hash table row container is deduplicated.

I think that Velox's join could benefit from this optimization, and I wonder if others feel that the current implementation has any issues. Alternatively, we could consider retaining the original mode but implementing a method to remove duplicate data from the row container and free up memory after addInput is completed, during the hash table build phase.

@mbasmanova
Copy link
Contributor

@liujiayi771

Thank you for working on this. Overall this optimization sounds useful. I particularly like that it seems you have made it adaptive, i.e. stop de-duplication if there are not too many duplicates.

I imagine this logic is non-trivial. It would be nice to update the PR description to provide a bit more context on the overall design and any tricky parts that you needed to handle. In particular, it would be nice to describe how the adaptivity works and which configs (if any) are introduced to control it.

Finally, wondering how did you make sure the changes are working 100%. What testing strategy did you use? How are you making sure test coverage is "complete"?

@liujiayi771
Copy link
Contributor Author

Hi @mbasmanova.
I will take some time to organize the design of this PR, and I will create an issue to discuss this design.

how did you make sure the changes are working 100%. What testing strategy did you use? How are you making sure test coverage is "complete"?

Are you referring to the adaptive stop de-duplicate? I have added two test cases, with kAbandonBuildNoDupHashMinPct and kAbandonBuildNoDupHashMinRows set to relatively low values to make it abandon de-duplicates. However, these tests may not be comprehensive enough; it might be better to add this test in the join fuzzer.

For the additional testing of this optimization, I am mainly relying on the join fuzzer and the existing test cases for left semi and anti-join to test this optimization.

@mbasmanova
Copy link
Contributor

@liujiayi771

I will take some time to organize the design of this PR, and I will create an issue to discuss this design.

That sounds great. Thank you.

However, these tests may not be comprehensive enough; it might be better to add this test in the join fuzzer.

Once the design is ready, let's review to figure out how to extend join fuzzer to provide good coverage.

Copy link

stale bot commented Jul 24, 2024

This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the PR, make sure you've addressed reviewer comments, and rebase on the latest main. Thank you for your contributions!

@stale stale bot added the stale label Jul 24, 2024
@stale stale bot closed this Aug 8, 2024
@aditi-pandit aditi-pandit reopened this Sep 26, 2024
@stale stale bot removed the stale label Sep 26, 2024
@aditi-pandit
Copy link
Collaborator

@liujiayi771 : Are you planning to continue this optimization ? This might be helpful for TPC-DS.

@liujiayi771
Copy link
Contributor Author

@aditi-pandit This patch has already been in use in our internal version and it has shown excellent results for q95. Could you help to review? I would like to continue developing this optimization.

@aditi-pandit
Copy link
Collaborator

@liujiayi771 : We (IBM) are interested as well. Can you rebase your code ? Will help me to pull this internally and test on our side.

Also, please can you provide your design in a doc or issue. I can help you with the reviews.

if (table_ != nullptr) {
// Set table_ to nullptr to trigger rehash.
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to clean up capacity_ and hashMode_ variables?

Copy link
Contributor Author

@liujiayi771 liujiayi771 Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to clean up other variables. Setting table_ = nullptr here is to allow entering the rehash process in the checkSize, and it is not intended for data cleanup.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST_F(HashJoinTest, semiJoinDeduplicateResetCapacity) {
  // The initial size of HashTable is 2048. After detecting the
  // kAbandonBuildNoDupHashMinRows row, it is found that the duplication rate is
  // lower than kAbandonBuildNoDupHashMinPct, so no more data is inserted into
  // HashTable (the capacity_ variable does not change afterwards). However,
  // data will continue to be added to RowContainer, causing numDistinct_ to be
  // greater than 2048, and eventually an error is reported in the
  // HashTable<ignoreNullKeys>::checkSize function.
  const int vectorSize = 10, batches = 210;
  auto probeVectors = makeBatches(batches, [&](int32_t /*unused*/) {
    return makeRowVector({
        // Join Key is double -> VectorHasher::typeKindSupportsValueIds will
        // return false -> HashMode is kHash
        makeFlatVector<double>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
        makeFlatVector<int64_t>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
    });
  });

  auto buildVectors = makeBatches(batches, [&](int32_t batch) {
    return makeRowVector({
        makeFlatVector<double>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
        makeFlatVector<int64_t>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
    });
  });

  createDuckDbTable("t", probeVectors);
  createDuckDbTable("u", buildVectors);

  auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
  auto plan = PlanBuilder(planNodeIdGenerator)
                  .values(probeVectors)
                  .project({"c0 AS t0", "c1 AS t1"})
                  .hashJoin(
                      {"t0"},
                      {"u0"},
                      PlanBuilder(planNodeIdGenerator)
                          .values(buildVectors)
                          .project({"c0 AS u0", "c1 AS u1"})
                          .planNode(),
                      "",
                      {"t0", "t1", "match"},
                      core::JoinType::kLeftSemiProject)
                  .planNode();

  HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
      .config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "10")
      .config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "50")
      .numDrivers(1)
      .checkSpillStats(false)
      .planNode(plan)
      .referenceQuery(
          "SELECT t.c0, t.c1, EXISTS (SELECT * FROM u WHERE t.c0 = u.c0) FROM t")
      .run();
}

This case will not work without resetting the capacity variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XinShuoWang Thank you for your review. I understand that when "abandon" is triggered, the capacity_ is not accurate and needs to be reset. I will make the necessary modifications and incorporate your test cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants