Skip to content

Commit

Permalink
Stream input row to hash table when addInput for left semi and anti join
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 21, 2023
1 parent e361a50 commit 3a97b3c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 13 deletions.
20 changes: 17 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void HashBuild::setupTable() {
if (joinNode_->isRightJoin() || joinNode_->isFullJoin() ||
joinNode_->isRightSemiProjectJoin()) {
// Do not ignore null keys.
ignoreNullKeys_ = false;
table_ = HashTable<false>::createForJoin(
std::move(keyHashers),
dependentTypes,
Expand All @@ -160,36 +161,43 @@ void HashBuild::setupTable() {
} else {
// (Left) semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
const bool dropDuplicates = !joinNode_->filter() &&
dropDuplicates_ = !joinNode_->filter() &&
(joinNode_->isLeftSemiFilterJoin() ||
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
if (dropDuplicates_) {
// Left semi and anti join do not require storing non-join key columns.
dependentTypes = std::vector<TypePtr>{};
}
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
// We need to check null key rows in build side in case of null-aware anti
// or left semi project join with filter set.
ignoreNullKeys_ = false;
table_ = HashTable<false>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
pool());
} else {
// Ignore null keys
ignoreNullKeys_ = true;
table_ = HashTable<true>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
pool());
}
}
lookup_ = std::make_unique<HashLookup>(table_->hashers());
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
}

Expand Down Expand Up @@ -381,6 +389,12 @@ void HashBuild::addInput(RowVectorPtr input) {
return;
}

if (dropDuplicates_) {
table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_);
table_->groupProbe(*lookup_);
return;
}

if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
hashes_.resize(activeRows_.end());
}
Expand Down
9 changes: 9 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ class HashBuild final : public Operator {
// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;

std::unique_ptr<HashLookup> lookup_;

// Key channels in 'input_'
std::vector<column_index_t> keyChannels_;

Expand Down Expand Up @@ -294,6 +296,13 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// Indicates whether the hash table ignore null keys.
bool ignoreNullKeys_{false};

// Indicates whether drop duplicate rows. Rows containing duplicate keys
// can be removed for left semi and anti join.
bool dropDuplicates_{false};

// Counts input batches and triggers spilling if folly hash of this % 100 <=
// 'testSpillPct_';.
uint64_t spillTestCounter_{0};
Expand Down
7 changes: 5 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ HashTable<ignoreNullKeys>::HashTable(
const std::shared_ptr<velox::HashStringAllocator>& stringArena)
: BaseHashTable(std::move(hashers)),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
isJoinBuild_(isJoinBuild),
allowDuplicates_(allowDuplicates) {
std::vector<TypePtr> keys;
for (auto& hasher : hashers_) {
keys.push_back(hasher->type());
Expand Down Expand Up @@ -1406,7 +1407,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
return;
}
disableRangeArrayHash_ |= disableRangeArrayHash;
if (numDistinct_ && !isJoinBuild_) {
if (numDistinct_ && (!isJoinBuild_ || !allowDuplicates_)) {
// If the join type is left semi and anti, allowDuplicates_ will be false,
// and join build is building hash table while adding input rows.
if (!analyze()) {
setHashMode(HashMode::kHash, numNew);
return;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ class HashTable : public BaseHashTable {
// or distinct mode VectorHashers in a group by hash table. 0 for
// join build sides.
int32_t reservePct() const {
return isJoinBuild_ ? 0 : 50;
return (isJoinBuild_ && allowDuplicates_) ? 0 : 50;
}

// Returns the byte offset of the bucket for 'hash' starting from 'table_'.
Expand Down Expand Up @@ -886,6 +886,7 @@ class HashTable : public BaseHashTable {

int8_t sizeBits_;
bool isJoinBuild_ = false;
bool allowDuplicates_ = false;

// Set at join build time if the table has duplicates, meaning that
// the join can be cardinality increasing. Atomic for tsan because
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
numAddedPools_ = 0;
}

RowVectorPtr newVector() {
VectorFuzzer fuzzer(fuzzerOpts_, pool());
RowVectorPtr newVector(size_t seed = 123456) {
VectorFuzzer fuzzer(fuzzerOpts_, pool(), seed);
return fuzzer.fuzzRow(rowType_);
}

Expand Down Expand Up @@ -1565,7 +1565,8 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
auto seed = folly::Random::rand32();
vectors.push_back(newVector(seed));
}
createDuckDbTable(vectors);
std::vector<bool> sameQueries = {false, true};
Expand All @@ -1586,7 +1587,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromJoinBuilder) {
std::atomic_bool taskPauseDone{false};
folly::EventCount taskPauseWait;

const auto joinMemoryUsage = 32L << 20;
const auto joinMemoryUsage = 20L << 20;
const auto fakeAllocationSize = kMemoryCapacity - joinMemoryUsage / 2;

std::atomic<bool> injectAllocationOnce{true};
Expand Down Expand Up @@ -1646,8 +1647,8 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromJoinBuilder) {
.values(vectors)
.project({"c0 AS t0", "c1 AS t1", "c2 AS t2"})
.hashJoin(
{"t0"},
{"u0"},
{"t2"},
{"u2"},
PlanBuilder(planNodeIdGenerator)
.values(vectors)
.project({"c0 AS u0", "c1 AS u1", "c2 AS u2"})
Expand Down Expand Up @@ -1706,7 +1707,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
folly::EventCount taskPauseWait;
auto taskPauseWaitKey = taskPauseWait.prepareWait();

const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);

std::atomic<bool> injectAllocationOnce{true};
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {
Expand Down

0 comments on commit 3a97b3c

Please sign in to comment.