-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Build hash table while adding input rows for left semi and anti join #7066
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
Hi @mbasmanova. Do you have any suggestions for this optimization? |
893ad56
to
efb2eea
Compare
velox/exec/HashTable.h
Outdated
@@ -751,7 +751,7 @@ class HashTable : public BaseHashTable { | |||
// or distinct mode VectorHashers in a group by hash table. 0 for | |||
// join build sides. | |||
int32_t reservePct() const { | |||
return isJoinBuild_ ? 0 : 50; | |||
return (isJoinBuild_ && rows_->nextOffset()) ? 0 : 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an unrelated fix? Maybe extract it into a separate PR and explain what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, there is no variable in the HashTable that can represent whether duplicate data is allowed. However, rows_->nextOffset() can be used to indicate that duplicate data is not allowed. For example, in the HashTable of a left semi join, rows_->nextOffset() is 0. Introducing a new variable allowDuplicates
in HashTable here may make it clearer.
I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg. I'm not sure if my understanding of reservePct
is correct, please correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that if HashBuild builds a hash table while adding input rows, it should also reserve some values for new keys, just like agg.
Makes sense. Let's make sure this code path is not activated when we are building hash table after seeing all the keys though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liujiayi771 Thank you for the optimization. Overall this makes sense to me. Just to clarify, my understand is the following.
Before this change, HashBuild operator first added all input data into RowContainer, then built a hash table over it. Now, HashBuild operator is building hash table while adding input rows. In case when HashBuild operator runs single-threaded there is no need to build hash table again after processing all input. In case when HashBuild operator runs multi-threaded it is still necessary to re-build the hash table after all input has been received an combined.
@mbasmanova Yes. For single-threaded scenarios, further optimization can be done to avoid rebuilding the hash table. |
Hi @mbasmanova. During the process of fixing the UT, I discovered that regardless of whether it is a single-threaded hash build or not, it is necessary to call |
Hi @xiaoxmeng. In addition, I made some modifications in Additionally, it may be necessary to reduce the |
00470bd
to
fef952b
Compare
cc @Yohahaha |
5dd0c7a
to
6803430
Compare
@xiaoxmeng Could you help review this change? |
@liujiayi771 @mbasmanova Is there any latest progress on this issue? I am very interested in it. I've been exploring the implementation details and have a question regarding the efficiency of the hash table construction within the addInput method. In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table. Additionally, for left semi join and anti join, is there an opportunity to only optimize by storing only the join keys in the RowContainer? |
I believe this should be done in the optimizer. The query plan passed to Velox should not have non-join keys on the build side. |
@xiaodouchen That's a good point. Perhaps, this logic can be adaptive. If after processing some data it sees low duplication rate, it stops building a hash table. |
@xiaodouchen @mbasmanova If the data has a very low duplication rate, it will indeed be time-consuming when rehashing at the end. A similar approach to the early abandonment in 'agg' can be adopted. Regarding not storing non-join keys in the row container, I understand that this has already been implemented. You can see it here, but the actual situation is that these columns will not be passed down. The optimizer will pass only the join key columns through a project. |
@liujiayi771 @mbasmanova Thank you for your reply!
Is there a planned roadmap to support this? Besides, what's the plan for this pr? @liujiayi771 I noticed that TPCH includes some semi-join cases. Have you conducted any performance tests with TPCH to evaluate the impact of your changes? I'm particularly interested in knowing the performance improvements in TPCH. |
@xiaodouchen The main purpose of this modification is to reduce memory consumption caused by duplicate data, and the performance improvement also primarily comes from the reduction in data volume. I have tested 10T TPCDS q14 and q95 under Spark + Velox and there is a very significant performance improvement. Memory usage has dropped by more than a hundredfold for join operator, and execution speed has increased by more than 20%. In the Velox's own TPCH benchmark, due to the smaller data volume, the improvement in speed may not be as noticeable. I can run a TPCH test on my cluster using Spark in the next couple of days to see the memory usage and speed improvement of the semi join, and I will reply with the results afterwards. I will look into optimizations based on data duplication rates in the next two weeks. |
Hi @xiaodouchen, I conducted a simple test on TPCH 1TB q20, q21, and q22. The optimizations made in this PR did not have a significant impact on these three queries. This could be due to the duplication rate of the build-side data. There was not much improvement in terms of execution time for these queries; the time remained mostly the same. The memory usage for q20 and q21 was also similar, with only q22 showing a noticeable decrease in memory usage for left semi-join. |
3a97b3c
to
116e9ce
Compare
Any progress for this PR? |
@zhli1142015 We've been using this PR internally, and the results have been quite positive. It's been a while since this PR was last updated, but I can continue to develop the dynamic enabling based on the duplication rate. Have you tested this pull request? Do you have any suggestions? |
Thanks for your update. We observed a silimar issue, maybe we can test this internally also. @ayushi-agarwal Thanks |
84af0e3
to
d0853df
Compare
2df6f68
to
c7770f3
Compare
Hi @mbasmanova @xiaoxmeng @Yuhta. I have recently spent some time updating this PR to include support for falling back to the original mode (only adding rows to the row container in This PR addresses the issue of not needing to save duplicate data to the row container for left semi and anti join when the join key has a high duplicate rate in the build side input. Storing these duplicate data can consume a significant amount of memory, especially in TPCDS Q95. Due to the current design of Velox, which first adds input and then builds the hash table, it is not possible to deduplicate in the I think that Velox's join could benefit from this optimization, and I wonder if others feel that the current implementation has any issues. Alternatively, we could consider retaining the original mode but implementing a method to remove duplicate data from the row container and free up memory after |
Thank you for working on this. Overall this optimization sounds useful. I particularly like that it seems you have made it adaptive, i.e. stop de-duplication if there are not too many duplicates. I imagine this logic is non-trivial. It would be nice to update the PR description to provide a bit more context on the overall design and any tricky parts that you needed to handle. In particular, it would be nice to describe how the adaptivity works and which configs (if any) are introduced to control it. Finally, wondering how did you make sure the changes are working 100%. What testing strategy did you use? How are you making sure test coverage is "complete"? |
Hi @mbasmanova.
Are you referring to the adaptive stop de-duplicate? I have added two test cases, with For the additional testing of this optimization, I am mainly relying on the join fuzzer and the existing test cases for left semi and anti-join to test this optimization. |
That sounds great. Thank you.
Once the design is ready, let's review to figure out how to extend join fuzzer to provide good coverage. |
This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the PR, make sure you've addressed reviewer comments, and rebase on the latest main. Thank you for your contributions! |
@liujiayi771 : Are you planning to continue this optimization ? This might be helpful for TPC-DS. |
@aditi-pandit This patch has already been in use in our internal version and it has shown excellent results for q95. Could you help to review? I would like to continue developing this optimization. |
@liujiayi771 : We (IBM) are interested as well. Can you rebase your code ? Will help me to pull this internally and test on our side. Also, please can you provide your design in a doc or issue. I can help you with the reviews. |
c7770f3
to
d6f02db
Compare
if (table_ != nullptr) { | ||
// Set table_ to nullptr to trigger rehash. | ||
rows_->pool()->freeContiguous(tableAllocation_); | ||
table_ = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to clean up capacity_
and hashMode_
variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to clean up other variables. Setting table_ = nullptr
here is to allow entering the rehash process in the checkSize
, and it is not intended for data cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TEST_F(HashJoinTest, semiJoinDeduplicateResetCapacity) {
// The initial size of HashTable is 2048. After detecting the
// kAbandonBuildNoDupHashMinRows row, it is found that the duplication rate is
// lower than kAbandonBuildNoDupHashMinPct, so no more data is inserted into
// HashTable (the capacity_ variable does not change afterwards). However,
// data will continue to be added to RowContainer, causing numDistinct_ to be
// greater than 2048, and eventually an error is reported in the
// HashTable<ignoreNullKeys>::checkSize function.
const int vectorSize = 10, batches = 210;
auto probeVectors = makeBatches(batches, [&](int32_t /*unused*/) {
return makeRowVector({
// Join Key is double -> VectorHasher::typeKindSupportsValueIds will
// return false -> HashMode is kHash
makeFlatVector<double>(
vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
makeFlatVector<int64_t>(
vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
});
});
auto buildVectors = makeBatches(batches, [&](int32_t batch) {
return makeRowVector({
makeFlatVector<double>(
vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
makeFlatVector<int64_t>(
vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
});
});
createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.values(probeVectors)
.project({"c0 AS t0", "c1 AS t1"})
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(buildVectors)
.project({"c0 AS u0", "c1 AS u1"})
.planNode(),
"",
{"t0", "t1", "match"},
core::JoinType::kLeftSemiProject)
.planNode();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "10")
.config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "50")
.numDrivers(1)
.checkSpillStats(false)
.planNode(plan)
.referenceQuery(
"SELECT t.c0, t.c1, EXISTS (SELECT * FROM u WHERE t.c0 = u.c0) FROM t")
.run();
}
This case will not work without resetting the capacity variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XinShuoWang Thank you for your review. I understand that when "abandon" is triggered, the capacity_
is not accurate and needs to be reset. I will make the necessary modifications and incorporate your test cases.
As discussed in the issue, during the hash build process, left semi join and anti join can deduplicate the input rows based on the join key. However, Velox's hash build addInput process adds all inputs to the RowContainer, which can result in significant memory wastage in certain scenarios, such as TPCDS Q14 and Q95. To address this, we can construct the hash table directly during data input and utilize the existing allowDuplicates parameter of the hashTable to remove duplicate data without storing it in the RowContainer. This process is similar to constructing a hash table in the hash aggregation process.
Due to Velox's hash build potentially having multiple drivers executing, in this scenario, duplicate data can only be removed for individual driver inputs. However, in the case of single-driver execution mode, it is possible to remove all duplicate data.