Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HashBuild unspilling stuck #8715

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ class QueryConfig {
/// calculate the spilling partition number for join spill or aggregation
/// spill.
uint8_t spillStartPartitionBit() const {
constexpr uint8_t kDefaultStartBit = 29;
constexpr uint8_t kDefaultStartBit = 48;
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

Expand Down
7 changes: 6 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ void GroupingSet::addInputForActiveRows(
TestValue::adjust(
"facebook::velox::exec::GroupingSet::addInputForActiveRows", this);

table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_);
table_->prepareForGroupProbe(
*lookup_,
input,
activeRows_,
ignoreNullKeys_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
// No rows to probe. Can happen when ignoreNullKeys_ is true and all rows
// have null keys.
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ bool HashBuild::finishHashBuild() {
table_->prepareJoinTable(
std::move(otherTables),
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit);
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
Expand Down
11 changes: 8 additions & 3 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,8 @@ bool mayUseValueIds(const BaseHashTable& table) {
template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor) {
folly::Executor* executor,
int8_t spillInputStartPartitionBit) {
buildExecutor_ = executor;
otherTables_.reserve(tables.size());
for (auto& table : tables) {
Expand Down Expand Up @@ -1650,6 +1651,7 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
} else {
decideHashMode(0);
}
checkHashBitsOverlap(spillInputStartPartitionBit);
}

template <bool ignoreNullKeys>
Expand Down Expand Up @@ -1982,7 +1984,9 @@ void BaseHashTable::prepareForGroupProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool ignoreNullKeys) {
bool ignoreNullKeys,
int8_t spillInputStartPartitionBit) {
checkHashBitsOverlap(spillInputStartPartitionBit);
auto& hashers = lookup.hashers;

for (auto& hasher : hashers) {
Expand Down Expand Up @@ -2015,7 +2019,8 @@ void BaseHashTable::prepareForGroupProbe(
decideHashMode(input->size());
// Do not forward 'ignoreNullKeys' to avoid redundant evaluation of
// deselectRowsWithNulls.
prepareForGroupProbe(lookup, input, rows, false);
prepareForGroupProbe(
lookup, input, rows, false, spillInputStartPartitionBit);
return;
}
}
Expand Down
44 changes: 35 additions & 9 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class BaseHashTable {
/// Specifies the hash mode of a table.
enum class HashMode { kHash, kArray, kNormalizedKey };

static constexpr int8_t kNoSpillInputStartPartitionBit = -1;

/// Returns the string of the given 'mode'.
static std::string modeString(HashMode mode);

Expand Down Expand Up @@ -181,7 +183,8 @@ class BaseHashTable {
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool ignoreNullKeys);
bool ignoreNullKeys,
int8_t spillInputStartPartitionBit);

/// Finds or creates a group for each key in 'lookup'. The keys are
/// returned in 'lookup.hits'.
Expand Down Expand Up @@ -248,7 +251,8 @@ class BaseHashTable {

virtual void prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor = nullptr) = 0;
folly::Executor* executor = nullptr,
int8_t spillInputStartPartitionBit = kNoSpillInputStartPartitionBit) = 0;

/// Returns the memory footprint in bytes for any data structures
/// owned by 'this'.
Expand Down Expand Up @@ -328,7 +332,12 @@ class BaseHashTable {

/// Extracts a 7 bit tag from a hash number. The high bit is always set.
static uint8_t hashTag(uint64_t hash) {
return static_cast<uint8_t>(hash >> 32) | 0x80;
// This is likely all 0 for small key types (<= 32 bits). Not an issue
// because small types have a range that makes them normalized key cases.
// If there are multiple small type keys, they are mixed which makes them a
// 64 bit hash. Normalized keys are mixed before being used as hash
// numbers.
return static_cast<uint8_t>(hash >> 38) | 0x80;
}

/// Loads a vector of tags for bulk comparison. Disables tsan errors
Expand Down Expand Up @@ -365,6 +374,20 @@ class BaseHashTable {

virtual void setHashMode(HashMode mode, int32_t numNew) = 0;

virtual int sizeBits() const = 0;

// We don't want any overlap in the bit ranges used by bucket index and those
// used by spill partitioning; otherwise because we receive data from only one
// partition, the overlapped bits would be the same and only a fraction of the
// buckets would be used. This would cause the insertion taking very long
// time and block driver threads.
void checkHashBitsOverlap(int8_t spillInputStartPartitionBit) {
if (spillInputStartPartitionBit != kNoSpillInputStartPartitionBit &&
hashMode() != HashMode::kArray) {
VELOX_CHECK_LE(sizeBits(), spillInputStartPartitionBit);
}
}

std::vector<std::unique_ptr<VectorHasher>> hashers_;
std::unique_ptr<RowContainer> rows_;

Expand Down Expand Up @@ -525,7 +548,9 @@ class HashTable : public BaseHashTable {
// and VectorHashers and decides the hash mode and representation.
void prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor = nullptr) override;
folly::Executor* executor = nullptr,
int8_t spillInputStartPartitionBit =
kNoSpillInputStartPartitionBit) override;

uint64_t hashTableSizeIncrease(int32_t numNewDistinct) const override {
if (numDistinct_ + numNewDistinct > rehashSize()) {
Expand Down Expand Up @@ -587,10 +612,6 @@ class HashTable : public BaseHashTable {
// occupy exactly two (64 bytes) cache lines.
class Bucket {
public:
Bucket() {
static_assert(sizeof(Bucket) == 128);
}

uint8_t tagAt(int32_t slotIndex) {
return reinterpret_cast<uint8_t*>(&tags_)[slotIndex];
}
Expand Down Expand Up @@ -622,6 +643,7 @@ class HashTable : public BaseHashTable {
char padding_[16];
};

static_assert(sizeof(Bucket) == 128);
static constexpr uint64_t kBucketSize = sizeof(Bucket);

// Returns the bucket at byte offset 'offset' from 'table_'.
Expand Down Expand Up @@ -881,6 +903,10 @@ class HashTable : public BaseHashTable {
}
}

int sizeBits() const final {
return sizeBits_;
}

// The min table size in row to trigger parallel join table build.
const uint32_t minTableSizeForParallelJoinBuild_;

Expand Down Expand Up @@ -938,7 +964,7 @@ class HashTable : public BaseHashTable {

// Executor for parallelizing hash join build. This may be the
// executor for Drivers. If this executor is indefinitely taken by
// other work, the thread of prepareJoinTables() will sequentially
// other work, the thread of prepareJoinTable() will sequentially
// execute the parallel build steps.
folly::Executor* buildExecutor_{nullptr};

Expand Down
13 changes: 10 additions & 3 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ void RowNumber::addInput(RowVectorPtr input) {
}

SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_,
input,
rows,
false,
BaseHashTable::kNoSpillInputStartPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand All @@ -93,7 +98,8 @@ void RowNumber::addInput(RowVectorPtr input) {
void RowNumber::addSpillInput() {
const auto numInput = input_->size();
SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input_, rows, false);
table_->prepareForGroupProbe(
*lookup_, input_, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand Down Expand Up @@ -157,7 +163,8 @@ void RowNumber::restoreNextSpillPartition() {

const auto numInput = input->size();
SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_, input, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

auto* counts = data->children().back()->as<FlatVector<int64_t>>();
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) {
ensureInputFits(input);

SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_,
input,
rows,
false,
BaseHashTable::kNoSpillInputStartPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions.
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5038,6 +5038,22 @@ TEST_F(HashJoinTest, spillFileSize) {
}
}

TEST_F(HashJoinTest, spillPartitionBitsOverlap) {
auto builder =
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.keyTypes({BIGINT(), BIGINT()})
.probeVectors(2'000, 3)
.buildVectors(2'000, 3)
.referenceQuery(
"SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "8")
.config(core::QueryConfig::kJoinSpillPartitionBits, "1")
.checkSpillStats(false)
.maxSpillLevel(0);
VELOX_ASSERT_THROW(builder.run(), "vs. 8");
}

// The test is to verify if the hash build reservation has been released on
// task error.
DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) {
Expand Down Expand Up @@ -5242,6 +5258,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) {
.spillDirectory(testData.spillEnabled ? tempDirectory->path : "")
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
if (testData.expectedReclaimable) {
Expand Down Expand Up @@ -5394,6 +5411,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
Expand Down Expand Up @@ -5788,6 +5806,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
Expand Down Expand Up @@ -6351,6 +6370,7 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto joinStats = task->taskStats()
.pipelineStats.back()
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ QueryTestResult runHashJoinTask(
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillEnabled, true)
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.queryCtx(queryCtx)
.maxDrivers(numDrivers)
.copyResults(pool, result.task);
Expand Down
Loading