Skip to content

Commit

Permalink
add fixed size map
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Jul 24, 2023
1 parent 74f2362 commit 989c224
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 66 deletions.
5 changes: 4 additions & 1 deletion src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct RadixPartitioningConstants {
};

template <class OP, class RETURN_TYPE, typename... ARGS>
RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&... args) {
RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&...args) {
D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS);
switch (radix_bits) {
case 0:
Expand Down Expand Up @@ -187,6 +187,9 @@ void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDa
column_ids.emplace_back(col_idx);
}
partitions[0]->InitializeAppend(state.chunk_state, std::move(column_ids));

// Initialize fixed-size map
state.fixed_partition_entries.resize(RadixPartitioning::NumberOfPartitions(radix_bits));
}

void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) {
Expand Down
119 changes: 56 additions & 63 deletions src/common/types/row/partitioned_tuple_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, DataCh
AppendUnified(state, input, append_sel, append_count);
}

bool PartitionedTupleData::UseFixedSizeMap() const {
return MaxPartitionIndex() < PartitionedTupleDataAppendState::MAP_THRESHOLD;
}

void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state, DataChunk &input,
const SelectionVector &append_sel, const idx_t append_count) {
const idx_t actual_append_count = append_count == DConstants::INVALID_INDEX ? input.size() : append_count;
Expand All @@ -58,11 +62,19 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state,
BuildPartitionSel(state, append_sel, actual_append_count);

// Early out: check if everything belongs to a single partition
const auto &partition_entries = state.partition_entries;
if (partition_entries.size() == 1) {
const auto &partition_index = partition_entries.begin()->first;
auto &partition = *partitions[partition_index];
auto &partition_pin_state = *state.partition_pin_states[partition_index];
optional_idx partition_index;
if (UseFixedSizeMap()) {
if (state.fixed_partition_entries.size() == 1) {
partition_index = state.fixed_partition_entries.begin()->first;
}
} else {
if (state.partition_entries.size() == 1) {
partition_index = state.partition_entries.begin()->first;
}
}
if (partition_index.IsValid()) {
auto &partition = *partitions[partition_index.GetIndex()];
auto &partition_pin_state = *state.partition_pin_states[partition_index.GetIndex()];

const auto size_before = partition.SizeInBytes();
partition.AppendUnified(partition_pin_state, state.chunk_state, input, append_sel, actual_append_count);
Expand Down Expand Up @@ -122,33 +134,30 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, TupleD

void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel,
const idx_t append_count) {
if (UseFixedSizeMap()) {
BuildPartitionSel<fixed_size_map_t<list_entry_t>>(state, state.fixed_partition_entries, append_sel,
append_count);
} else {
BuildPartitionSel<perfect_map_t<list_entry_t>>(state, state.partition_entries, append_sel, append_count);
}
}

template <class MAP_TYPE>
void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries,
const SelectionVector &append_sel, const idx_t append_count) {
const auto partition_indices = FlatVector::GetData<idx_t>(state.partition_indices);
auto &partition_entries = state.partition_entries;
auto &partition_entries_arr = state.partition_entries_arr;
partition_entries.clear();

const auto max_partition_index = MaxPartitionIndex();
const auto use_arr = max_partition_index < PartitionedTupleDataAppendState::MAP_THRESHOLD;

switch (state.partition_indices.GetVectorType()) {
case VectorType::FLAT_VECTOR:
if (use_arr) {
std::fill_n(partition_entries_arr, max_partition_index + 1, list_entry_t(0, 0));
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
partition_entries_arr[partition_index].length++;
}
} else {
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
auto partition_entry = partition_entries.find(partition_index);
if (partition_entry == partition_entries.end()) {
partition_entries.emplace(partition_index, list_entry_t(0, 1));
} else {
partition_entry->second.length++;
}
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
auto partition_entry = partition_entries.find(partition_index);
if (partition_entry == partition_entries.end()) {
partition_entries[partition_index] = list_entry_t(0, 1);
} else {
partition_entry->second.length++;
}
}
break;
Expand All @@ -171,51 +180,35 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st

// Compute offsets from the counts
idx_t offset = 0;
if (use_arr) {
for (idx_t partition_index = 0; partition_index <= max_partition_index; partition_index++) {
auto &partition_entry = partition_entries_arr[partition_index];
partition_entry.offset = offset;
offset += partition_entry.length;
}
} else {
for (auto &pc : partition_entries) {
auto &partition_entry = pc.second;
partition_entry.offset = offset;
offset += partition_entry.length;
}
for (auto &pc : partition_entries) {
auto &partition_entry = pc.second;
partition_entry.offset = offset;
offset += partition_entry.length;
}

// Now initialize a single selection vector that acts as a selection vector for every partition
auto &partition_sel = state.partition_sel;
auto &reverse_partition_sel = state.reverse_partition_sel;
if (use_arr) {
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
auto &partition_offset = partition_entries_arr[partition_index].offset;
reverse_partition_sel[index] = partition_offset;
partition_sel[partition_offset++] = index;
}
// Now just add it to the map anyway so the rest of the functionality is shared
for (idx_t partition_index = 0; partition_index <= max_partition_index; partition_index++) {
const auto &partition_entry = partition_entries_arr[partition_index];
if (partition_entry.length != 0) {
partition_entries.emplace(partition_index, partition_entry);
}
}
} else {
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
auto &partition_offset = partition_entries[partition_index].offset;
reverse_partition_sel[index] = partition_offset;
partition_sel[partition_offset++] = index;
}
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
auto &partition_offset = partition_entries[partition_index].offset;
reverse_partition_sel[index] = partition_offset;
partition_sel[partition_offset++] = index;
}
}

void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state) {
for (auto &pc : state.partition_entries) {
if (UseFixedSizeMap()) {
BuildBufferSpace<fixed_size_map_t<list_entry_t>>(state, state.fixed_partition_entries);
} else {
BuildBufferSpace<perfect_map_t<list_entry_t>>(state, state.partition_entries);
}
}

template <class MAP_TYPE>
void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries) {
for (auto &pc : partition_entries) {
const auto &partition_index = pc.first;

// Partition, pin state for this partition index
Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/common/perfect_map_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb/common/types.hpp"
#include "duckdb/common/types/validity_mask.hpp"
#include "duckdb/common/unordered_map.hpp"
#include "duckdb/common/unordered_set.hpp"

Expand Down
10 changes: 9 additions & 1 deletion src/include/duckdb/common/types/row/partitioned_tuple_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "duckdb/common/fixed_size_map.hpp"
#include "duckdb/common/perfect_map_set.hpp"
#include "duckdb/common/types/row/tuple_data_allocator.hpp"
#include "duckdb/common/types/row/tuple_data_collection.hpp"
Expand All @@ -27,7 +28,7 @@ struct PartitionedTupleDataAppendState {

static constexpr idx_t MAP_THRESHOLD = 32;
perfect_map_t<list_entry_t> partition_entries;
list_entry_t partition_entries_arr[MAP_THRESHOLD];
fixed_size_map_t<list_entry_t> fixed_partition_entries;

vector<unique_ptr<TupleDataPinState>> partition_pin_states;
TupleDataChunkState chunk_state;
Expand Down Expand Up @@ -134,12 +135,19 @@ class PartitionedTupleData {

//! Create a new shared allocator
void CreateAllocator();
//! Whether to use fixed size map or regular marp
bool UseFixedSizeMap() const;
//! Builds a selection vector in the Append state for the partitions
//! - returns true if everything belongs to the same partition - stores partition index in single_partition_idx
void BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel,
const idx_t append_count);
template <class MAP_TYPE>
void BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries,
const SelectionVector &append_sel, const idx_t append_count);
//! Builds out the buffer space in the partitions
void BuildBufferSpace(PartitionedTupleDataAppendState &state);
template <class MAP_TYPE>
void BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries);
//! Create a collection for a specific a partition
unique_ptr<TupleDataCollection> CreatePartitionCollection(idx_t partition_index) const {
if (allocators) {
Expand Down
2 changes: 1 addition & 1 deletion src/parallel/pipeline_finish_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PipelineFinishTask : public ExecutorTask {
//! Debugging state: number of times blocked
int debug_blocked_count = 0;
//! Number of times the Finalize will block before actually returning data
int debug_blocked_target_count = 1;
int debug_blocked_target_count = 10;
#endif
};

Expand Down

0 comments on commit 989c224

Please sign in to comment.