Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build hash table while adding input rows for left semi and anti join #7066

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ DEBUG_ONLY_TEST_P(
folly::EventCount taskPauseWait;
auto taskPauseWaitKey = taskPauseWait.prepareWait();

const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);

std::atomic<bool> injectAllocationOnce{true};
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {
Expand Down
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ class QueryConfig {
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
"abandon_partial_topn_row_number_min_pct";

static constexpr const char* kAbandonBuildNoDupHashMinRows =
"abandon_build_no_dup_hash_min_rows";

static constexpr const char* kAbandonBuildNoDupHashMinPct =
"abandon_build_no_dup_hash_min_pct";

static constexpr const char* kMaxPartitionedOutputBufferSize =
"max_page_partitioning_buffer_size";

Expand Down Expand Up @@ -454,6 +460,14 @@ class QueryConfig {
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
}

int32_t abandonBuildNoDupHashMinRows() const {
return get<int32_t>(kAbandonBuildNoDupHashMinRows, 100'000);
}

int32_t abandonBuildNoDupHashMinPct() const {
return get<int32_t>(kAbandonBuildNoDupHashMinPct, 80);
}

uint64_t maxSpillRunRows() const {
static constexpr uint64_t kDefault = 12UL << 20;
return get<uint64_t>(kMaxSpillRunRows, kDefault);
Expand Down
73 changes: 57 additions & 16 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ HashBuild::HashBuild(
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())),
keyChannelMap_(joinNode_->rightKeys().size()) {
keyChannelMap_(joinNode_->rightKeys().size()),
abandonBuildNoDupHashMinRows_(
driverCtx->queryConfig().abandonBuildNoDupHashMinRows()),
abandonBuildNoDupHashMinPct_(
driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) {
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

Expand All @@ -91,9 +95,11 @@ HashBuild::HashBuild(
types.emplace_back(inputType->childAt(channel));
}

dropDuplicates_ = canDropDuplicates(joinNode_);

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (numDependents > 0) {
if (!dropDuplicates_ && numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot be
// used to call 'reserve'. This happens when we join different probe side
Expand All @@ -102,12 +108,16 @@ HashBuild::HashBuild(
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
if (!dropDuplicates_) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}
}

Expand Down Expand Up @@ -155,11 +165,6 @@ void HashBuild::setupTable() {
.minTableRowsForParallelJoinBuild(),
pool());
} else {
// (Left) semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
const bool dropDuplicates = !joinNode_->filter() &&
(joinNode_->isLeftSemiFilterJoin() ||
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
Expand All @@ -168,7 +173,7 @@ void HashBuild::setupTable() {
table_ = HashTable<false>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
Expand All @@ -179,14 +184,16 @@ void HashBuild::setupTable() {
table_ = HashTable<true>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
pool());
}
}
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_->reset(1);
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
}

Expand Down Expand Up @@ -381,6 +388,32 @@ void HashBuild::addInput(RowVectorPtr input) {
return;
}

numInputRows_ += activeRows_.countSelected();

if (dropDuplicates_ && !abandonBuildNoDupHash_) {
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
if (abandonEarly) {
// The hash table is no longer directly constructed in addInput. The data
// that was previously inserted into the hash table is already in the
// RowContainer.
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
abandonBuildNoDupHash_ = true;
table_->joinTableMayHaveDuplicates();
} else {
table_->prepareForGroupProbe(
*lookup_,
input,
activeRows_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
return;
}
table_->groupProbe(
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
return;
}
}

if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
hashes_.resize(activeRows_.end());
}
Expand Down Expand Up @@ -771,7 +804,8 @@ bool HashBuild::finishHashBuild() {
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit,
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
dropDuplicates_);
}
stats_.wlock()->addRuntimeStat(
BaseHashTable::kBuildWallNanos,
Expand Down Expand Up @@ -869,6 +903,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
setupTable();
setupSpiller(spillInput.spillPartition.get());
stateCleared_ = false;
numInputRows_ = 0;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1171,4 +1206,10 @@ void HashBuild::close() {
table_.reset();
}
}

bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const {
VELOX_CHECK(dropDuplicates_);
return numInputRows_ > abandonBuildNoDupHashMinRows_ &&
100 * numDistinct / numInputRows_ >= abandonBuildNoDupHashMinPct_;
}
} // namespace facebook::velox::exec
22 changes: 22 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ class HashBuild final : public Operator {
// not.
bool nonReclaimableState() const;

// True if we have enough rows and not enough duplicate join keys, i.e. more
// than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than
// 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique.
bool abandonBuildNoDupHashEarly(int64_t numDistinct) const;

const std::shared_ptr<const core::HashJoinNode> joinNode_;

const core::JoinType joinType_;
Expand Down Expand Up @@ -237,6 +242,7 @@ class HashBuild final : public Operator {

// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;
std::unique_ptr<HashLookup> lookup_;

// Key channels in 'input_'
std::vector<column_index_t> keyChannels_;
Expand Down Expand Up @@ -265,6 +271,11 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// Indicates whether drop duplicate rows. Rows containing duplicate keys
// can be removed for left semi and anti join.
bool dropDuplicates_{false};
bool abandonBuildNoDupHash_{false};

// The type used to spill hash table which might attach a boolean column to
// record the probed flag if 'needProbedFlagSpill_' is true.
RowTypePtr spillType_;
Expand Down Expand Up @@ -303,6 +314,17 @@ class HashBuild final : public Operator {

// Maps key channel in 'input_' to channel in key.
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;

// Count the number of input rows.
int64_t numInputRows_ = 0;

// Minimum number of rows to see before deciding to give up build no
// duplicates hash table.
const int32_t abandonBuildNoDupHashMinRows_;
// Min unique rows pct for give up build no duplicates hash table. If more
// than this many rows are unique, build hash table in addInput phase is not
// worthwhile.
const int32_t abandonBuildNoDupHashMinPct_;
};

inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {
Expand Down
9 changes: 9 additions & 0 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ bool isLeftNullAwareJoinWithFilter(
joinNode->isNullAware() && (joinNode->filter() != nullptr);
}

bool canDropDuplicates(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
// Left semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
return !joinNode->filter() &&
(joinNode->isLeftSemiFilterJoin() || joinNode->isLeftSemiProjectJoin() ||
joinNode->isAntiJoin());
}

uint64_t HashJoinMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes,
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ class HashJoinBridge : public JoinBridge {
bool isLeftNullAwareJoinWithFilter(
const std::shared_ptr<const core::HashJoinNode>& joinNode);

// Indicates if 'joinNode' can drop duplicate rows with same join key. For left
// semi and anti join, it is not necessary to store duplicate rows.
bool canDropDuplicates(
const std::shared_ptr<const core::HashJoinNode>& joinNode);

class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create() {
Expand Down
15 changes: 10 additions & 5 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1979,13 +1979,18 @@ void HashProbe::prepareTableSpill(
names.emplace_back(tableInputType->nameOf(channel));
types.emplace_back(tableInputType->childAt(channel));
}
const auto numDependents = tableInputType->size() - numKeys;
for (auto i = 0; i < tableInputType->size(); ++i) {
if (keyChannelMap.find(i) == keyChannelMap.end()) {
names.emplace_back(tableInputType->nameOf(i));
types.emplace_back(tableInputType->childAt(i));
if (!canDropDuplicates(joinNode_)) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
const auto numDependents = tableInputType->size() - numKeys;
for (auto i = 0; i < tableInputType->size(); ++i) {
if (keyChannelMap.find(i) == keyChannelMap.end()) {
names.emplace_back(tableInputType->nameOf(i));
types.emplace_back(tableInputType->childAt(i));
}
}
}

tableSpillType_ = hashJoinTableSpillType(
ROW(std::move(names), std::move(types)), joinType_);
}
Expand Down
31 changes: 28 additions & 3 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ HashTable<ignoreNullKeys>::HashTable(
memory::MemoryPool* pool)
: BaseHashTable(std::move(hashers)),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
isJoinBuild_(isJoinBuild),
joinBuildNoDuplicates_(!allowDuplicates) {
std::vector<TypePtr> keys;
for (auto& hasher : hashers_) {
keys.push_back(hasher->type());
Expand Down Expand Up @@ -1512,7 +1513,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
return;
}
disableRangeArrayHash_ |= disableRangeArrayHash;
if (numDistinct_ && !isJoinBuild_) {
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) {
// If the join type is left semi and anti, allowDuplicates_ will be false,
// and join build is building hash table while adding input rows.
if (!analyze()) {
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
return;
Expand Down Expand Up @@ -1731,8 +1734,21 @@ template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
int8_t spillInputStartPartitionBit,
folly::Executor* executor) {
folly::Executor* executor,
bool dropDuplicates) {
buildExecutor_ = executor;
if (dropDuplicates) {
if (table_ != nullptr) {
// Set table_ to nullptr to trigger rehash.
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to clean up capacity_ and hashMode_ variables?

Copy link
Contributor Author

@liujiayi771 liujiayi771 Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to clean up other variables. Setting table_ = nullptr here is to allow entering the rehash process in the checkSize, and it is not intended for data cleanup.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST_F(HashJoinTest, semiJoinDeduplicateResetCapacity) {
  // The initial size of HashTable is 2048. After detecting the
  // kAbandonBuildNoDupHashMinRows row, it is found that the duplication rate is
  // lower than kAbandonBuildNoDupHashMinPct, so no more data is inserted into
  // HashTable (the capacity_ variable does not change afterwards). However,
  // data will continue to be added to RowContainer, causing numDistinct_ to be
  // greater than 2048, and eventually an error is reported in the
  // HashTable<ignoreNullKeys>::checkSize function.
  const int vectorSize = 10, batches = 210;
  auto probeVectors = makeBatches(batches, [&](int32_t /*unused*/) {
    return makeRowVector({
        // Join Key is double -> VectorHasher::typeKindSupportsValueIds will
        // return false -> HashMode is kHash
        makeFlatVector<double>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
        makeFlatVector<int64_t>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
    });
  });

  auto buildVectors = makeBatches(batches, [&](int32_t batch) {
    return makeRowVector({
        makeFlatVector<double>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
        makeFlatVector<int64_t>(
            vectorSize, [&](vector_size_t /*row*/) { return rand(); }),
    });
  });

  createDuckDbTable("t", probeVectors);
  createDuckDbTable("u", buildVectors);

  auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
  auto plan = PlanBuilder(planNodeIdGenerator)
                  .values(probeVectors)
                  .project({"c0 AS t0", "c1 AS t1"})
                  .hashJoin(
                      {"t0"},
                      {"u0"},
                      PlanBuilder(planNodeIdGenerator)
                          .values(buildVectors)
                          .project({"c0 AS u0", "c1 AS u1"})
                          .planNode(),
                      "",
                      {"t0", "t1", "match"},
                      core::JoinType::kLeftSemiProject)
                  .planNode();

  HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
      .config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "10")
      .config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "50")
      .numDrivers(1)
      .checkSpillStats(false)
      .planNode(plan)
      .referenceQuery(
          "SELECT t.c0, t.c1, EXISTS (SELECT * FROM u WHERE t.c0 = u.c0) FROM t")
      .run();
}

This case will not work without resetting the capacity variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XinShuoWang Thank you for your review. I understand that when "abandon" is triggered, the capacity_ is not accurate and needs to be reset. I will make the necessary modifications and incorporate your test cases.

}
// Call analyze to insert all unique values in row container to the
// table hashers' uniqueValues_;
if (!analyze()) {
setHashMode(HashMode::kHash, 0, spillInputStartPartitionBit);
}
}
otherTables_.reserve(tables.size());
for (auto& table : tables) {
otherTables_.emplace_back(std::unique_ptr<HashTable<ignoreNullKeys>>(
Expand Down Expand Up @@ -1762,6 +1778,15 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
if (useValueIds) {
for (auto& other : otherTables_) {
if (dropDuplicates) {
// Before merging with the current hashers, all values in the row
// containers of other table need to be inserted into uniqueValues_.
if (!other->analyze()) {
other->setHashMode(HashMode::kHash, 0, spillInputStartPartitionBit);
useValueIds = false;
break;
}
}
for (auto i = 0; i < hashers_.size(); ++i) {
hashers_[i]->merge(*other->hashers_[i]);
if (!hashers_[i]->mayUseValueIds()) {
Expand Down
Loading
Loading