Skip to content

Commit

Permalink
improve out-of-core for 1/2 threads
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Oct 16, 2024
1 parent 92d9080 commit 911060e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/execution/aggregate_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void GroupedAggregateHashTable::SetRadixBits(idx_t radix_bits_p) {
void GroupedAggregateHashTable::Resize(idx_t size) {
D_ASSERT(size >= STANDARD_VECTOR_SIZE);
D_ASSERT(IsPowerOfTwo(size));
if (size < capacity) {
throw InternalException("Cannot downsize a hash table!");
if (Count() != 0 && size < capacity) {
throw InternalException("Cannot downsize a non-empty hash table!");
}

capacity = size;
Expand Down
18 changes: 6 additions & 12 deletions src/execution/radix_partitioned_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,13 @@ struct RadixHTConfig {
static constexpr const idx_t MAXIMUM_INITIAL_SINK_RADIX_BITS = 3;
//! Maximum Sink radix bits (independent of threads)
static constexpr const idx_t MAXIMUM_FINAL_SINK_RADIX_BITS = 7;
//! By how many radix bits to increment if we go external
static constexpr const idx_t EXTERNAL_RADIX_BITS_INCREMENT = 3;

//! The global sink state
RadixHTGlobalSinkState &sink;
//! Current thread-global sink radix bits
atomic<idx_t> sink_radix_bits;
//! Maximum Sink radix bits (set based on number of threads)
const idx_t maximum_sink_radix_bits;
//! Radix bits if we go external
const idx_t external_radix_bits;

public:
//! Capacity of HTs during the Sink
Expand Down Expand Up @@ -256,16 +252,15 @@ void RadixHTGlobalSinkState::Destroy() {

RadixHTConfig::RadixHTConfig(ClientContext &context, RadixHTGlobalSinkState &sink_p)
: sink(sink_p), sink_radix_bits(InitialSinkRadixBits(context)),
maximum_sink_radix_bits(MaximumSinkRadixBits(context)),
external_radix_bits(ExternalRadixBits(maximum_sink_radix_bits)), sink_capacity(SinkCapacity(context)) {
maximum_sink_radix_bits(MaximumSinkRadixBits(context)), sink_capacity(SinkCapacity(context)) {
}

void RadixHTConfig::SetRadixBits(idx_t radix_bits_p) {
SetRadixBitsInternal(MinValue(radix_bits_p, maximum_sink_radix_bits), false);
}

bool RadixHTConfig::SetRadixBitsToExternal() {
SetRadixBitsInternal(external_radix_bits, true);
SetRadixBitsInternal(MAXIMUM_FINAL_SINK_RADIX_BITS, true);
return sink.external;
}

Expand Down Expand Up @@ -301,10 +296,6 @@ idx_t RadixHTConfig::MaximumSinkRadixBits(ClientContext &context) {
MAXIMUM_FINAL_SINK_RADIX_BITS);
}

idx_t RadixHTConfig::ExternalRadixBits(const idx_t &maximum_sink_radix_bits_p) {
return MinValue(maximum_sink_radix_bits_p + EXTERNAL_RADIX_BITS_INCREMENT, MAXIMUM_FINAL_SINK_RADIX_BITS);
}

idx_t RadixHTConfig::SinkCapacity(ClientContext &context) {
// Get active and maximum number of threads
const auto active_threads = NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads());
Expand Down Expand Up @@ -464,7 +455,7 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk
return; // We can fit another chunk
}

if (gstate.number_of_threads > 2) {
if (gstate.number_of_threads > 2 || gstate.external) {
// 'Reset' the HT without taking its data, we can just keep appending to the same collection
// This only works because we never resize the HT
ht.ClearPointerTable();
Expand All @@ -479,6 +470,9 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk
// We repartitioned, but we didn't clear the pointer table / reset the count because we're on 1 or 2 threads
ht.ClearPointerTable();
ht.ResetCount();
if (gstate.external) {
ht.Resize(gstate.config.sink_capacity);
}
}

// TODO: combine early and often
Expand Down

0 comments on commit 911060e

Please sign in to comment.