From 989c22448dd1b06c27294586ec2f125379e60637 Mon Sep 17 00:00:00 2001 From: Laurens Kuiper Date: Mon, 24 Jul 2023 16:41:28 +0200 Subject: [PATCH] add fixed size map --- src/common/radix_partitioning.cpp | 5 +- .../types/row/partitioned_tuple_data.cpp | 119 +++++++++--------- src/include/duckdb/common/perfect_map_set.hpp | 1 + .../types/row/partitioned_tuple_data.hpp | 10 +- src/parallel/pipeline_finish_event.cpp | 2 +- 5 files changed, 71 insertions(+), 66 deletions(-) diff --git a/src/common/radix_partitioning.cpp b/src/common/radix_partitioning.cpp index abadabcf40b4..f6ac42fc87f7 100644 --- a/src/common/radix_partitioning.cpp +++ b/src/common/radix_partitioning.cpp @@ -26,7 +26,7 @@ struct RadixPartitioningConstants { }; template -RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&... args) { +RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&...args) { D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS); switch (radix_bits) { case 0: @@ -187,6 +187,9 @@ void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDa column_ids.emplace_back(col_idx); } partitions[0]->InitializeAppend(state.chunk_state, std::move(column_ids)); + + // Initialize fixed-size map + state.fixed_partition_entries.resize(RadixPartitioning::NumberOfPartitions(radix_bits)); } void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) { diff --git a/src/common/types/row/partitioned_tuple_data.cpp b/src/common/types/row/partitioned_tuple_data.cpp index 87ad2d665794..175687ed801c 100644 --- a/src/common/types/row/partitioned_tuple_data.cpp +++ b/src/common/types/row/partitioned_tuple_data.cpp @@ -47,6 +47,10 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, DataCh AppendUnified(state, input, append_sel, append_count); } +bool PartitionedTupleData::UseFixedSizeMap() const { + return MaxPartitionIndex() < PartitionedTupleDataAppendState::MAP_THRESHOLD; +} + void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state, DataChunk &input, const SelectionVector &append_sel, const idx_t append_count) { const idx_t actual_append_count = append_count == DConstants::INVALID_INDEX ? input.size() : append_count; @@ -58,11 +62,19 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state, BuildPartitionSel(state, append_sel, actual_append_count); // Early out: check if everything belongs to a single partition - const auto &partition_entries = state.partition_entries; - if (partition_entries.size() == 1) { - const auto &partition_index = partition_entries.begin()->first; - auto &partition = *partitions[partition_index]; - auto &partition_pin_state = *state.partition_pin_states[partition_index]; + optional_idx partition_index; + if (UseFixedSizeMap()) { + if (state.fixed_partition_entries.size() == 1) { + partition_index = state.fixed_partition_entries.begin()->first; + } + } else { + if (state.partition_entries.size() == 1) { + partition_index = state.partition_entries.begin()->first; + } + } + if (partition_index.IsValid()) { + auto &partition = *partitions[partition_index.GetIndex()]; + auto &partition_pin_state = *state.partition_pin_states[partition_index.GetIndex()]; const auto size_before = partition.SizeInBytes(); partition.AppendUnified(partition_pin_state, state.chunk_state, input, append_sel, actual_append_count); @@ -122,33 +134,30 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, TupleD void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel, const idx_t append_count) { + if (UseFixedSizeMap()) { + BuildPartitionSel>(state, state.fixed_partition_entries, append_sel, + append_count); + } else { + BuildPartitionSel>(state, state.partition_entries, append_sel, append_count); + } +} + +template +void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries, + const SelectionVector &append_sel, const idx_t append_count) { const auto partition_indices = FlatVector::GetData(state.partition_indices); - auto &partition_entries = state.partition_entries; - auto &partition_entries_arr = state.partition_entries_arr; partition_entries.clear(); - const auto max_partition_index = MaxPartitionIndex(); - const auto use_arr = max_partition_index < PartitionedTupleDataAppendState::MAP_THRESHOLD; - switch (state.partition_indices.GetVectorType()) { case VectorType::FLAT_VECTOR: - if (use_arr) { - std::fill_n(partition_entries_arr, max_partition_index + 1, list_entry_t(0, 0)); - for (idx_t i = 0; i < append_count; i++) { - const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; - partition_entries_arr[partition_index].length++; - } - } else { - for (idx_t i = 0; i < append_count; i++) { - const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; - auto partition_entry = partition_entries.find(partition_index); - if (partition_entry == partition_entries.end()) { - partition_entries.emplace(partition_index, list_entry_t(0, 1)); - } else { - partition_entry->second.length++; - } + for (idx_t i = 0; i < append_count; i++) { + const auto index = append_sel.get_index(i); + const auto &partition_index = partition_indices[index]; + auto partition_entry = partition_entries.find(partition_index); + if (partition_entry == partition_entries.end()) { + partition_entries[partition_index] = list_entry_t(0, 1); + } else { + partition_entry->second.length++; } } break; @@ -171,51 +180,35 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st // Compute offsets from the counts idx_t offset = 0; - if (use_arr) { - for (idx_t partition_index = 0; partition_index <= max_partition_index; partition_index++) { - auto &partition_entry = partition_entries_arr[partition_index]; - partition_entry.offset = offset; - offset += partition_entry.length; - } - } else { - for (auto &pc : partition_entries) { - auto &partition_entry = pc.second; - partition_entry.offset = offset; - offset += partition_entry.length; - } + for (auto &pc : partition_entries) { + auto &partition_entry = pc.second; + partition_entry.offset = offset; + offset += partition_entry.length; } // Now initialize a single selection vector that acts as a selection vector for every partition auto &partition_sel = state.partition_sel; auto &reverse_partition_sel = state.reverse_partition_sel; - if (use_arr) { - for (idx_t i = 0; i < append_count; i++) { - const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; - auto &partition_offset = partition_entries_arr[partition_index].offset; - reverse_partition_sel[index] = partition_offset; - partition_sel[partition_offset++] = index; - } - // Now just add it to the map anyway so the rest of the functionality is shared - for (idx_t partition_index = 0; partition_index <= max_partition_index; partition_index++) { - const auto &partition_entry = partition_entries_arr[partition_index]; - if (partition_entry.length != 0) { - partition_entries.emplace(partition_index, partition_entry); - } - } - } else { - for (idx_t i = 0; i < append_count; i++) { - const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; - auto &partition_offset = partition_entries[partition_index].offset; - reverse_partition_sel[index] = partition_offset; - partition_sel[partition_offset++] = index; - } + for (idx_t i = 0; i < append_count; i++) { + const auto index = append_sel.get_index(i); + const auto &partition_index = partition_indices[index]; + auto &partition_offset = partition_entries[partition_index].offset; + reverse_partition_sel[index] = partition_offset; + partition_sel[partition_offset++] = index; } } void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state) { - for (auto &pc : state.partition_entries) { + if (UseFixedSizeMap()) { + BuildBufferSpace>(state, state.fixed_partition_entries); + } else { + BuildBufferSpace>(state, state.partition_entries); + } +} + +template +void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries) { + for (auto &pc : partition_entries) { const auto &partition_index = pc.first; // Partition, pin state for this partition index diff --git a/src/include/duckdb/common/perfect_map_set.hpp b/src/include/duckdb/common/perfect_map_set.hpp index 6c31b83d5bd1..1e735a638559 100644 --- a/src/include/duckdb/common/perfect_map_set.hpp +++ b/src/include/duckdb/common/perfect_map_set.hpp @@ -9,6 +9,7 @@ #pragma once #include "duckdb/common/types.hpp" +#include "duckdb/common/types/validity_mask.hpp" #include "duckdb/common/unordered_map.hpp" #include "duckdb/common/unordered_set.hpp" diff --git a/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp b/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp index 9fe134f36367..691fe0a45dcb 100644 --- a/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp +++ b/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp @@ -8,6 +8,7 @@ #pragma once +#include "duckdb/common/fixed_size_map.hpp" #include "duckdb/common/perfect_map_set.hpp" #include "duckdb/common/types/row/tuple_data_allocator.hpp" #include "duckdb/common/types/row/tuple_data_collection.hpp" @@ -27,7 +28,7 @@ struct PartitionedTupleDataAppendState { static constexpr idx_t MAP_THRESHOLD = 32; perfect_map_t partition_entries; - list_entry_t partition_entries_arr[MAP_THRESHOLD]; + fixed_size_map_t fixed_partition_entries; vector> partition_pin_states; TupleDataChunkState chunk_state; @@ -134,12 +135,19 @@ class PartitionedTupleData { //! Create a new shared allocator void CreateAllocator(); + //! Whether to use fixed size map or regular marp + bool UseFixedSizeMap() const; //! Builds a selection vector in the Append state for the partitions //! - returns true if everything belongs to the same partition - stores partition index in single_partition_idx void BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel, const idx_t append_count); + template + void BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries, + const SelectionVector &append_sel, const idx_t append_count); //! Builds out the buffer space in the partitions void BuildBufferSpace(PartitionedTupleDataAppendState &state); + template + void BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries); //! Create a collection for a specific a partition unique_ptr CreatePartitionCollection(idx_t partition_index) const { if (allocators) { diff --git a/src/parallel/pipeline_finish_event.cpp b/src/parallel/pipeline_finish_event.cpp index 056bf2f0770a..a3858a2ddf33 100644 --- a/src/parallel/pipeline_finish_event.cpp +++ b/src/parallel/pipeline_finish_event.cpp @@ -51,7 +51,7 @@ class PipelineFinishTask : public ExecutorTask { //! Debugging state: number of times blocked int debug_blocked_count = 0; //! Number of times the Finalize will block before actually returning data - int debug_blocked_target_count = 1; + int debug_blocked_target_count = 10; #endif };