Skip to content

Commit

Permalink
improve tuple data append performance
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Jul 25, 2023
1 parent 989c224 commit 7b8cca4
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 74 deletions.
53 changes: 39 additions & 14 deletions src/common/types/row/partitioned_tuple_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state,
optional_idx partition_index;
if (UseFixedSizeMap()) {
if (state.fixed_partition_entries.size() == 1) {
partition_index = state.fixed_partition_entries.begin()->first;
partition_index = state.fixed_partition_entries.begin().GetKey();
}
} else {
if (state.partition_entries.size() == 1) {
Expand Down Expand Up @@ -132,17 +132,40 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, TupleD
Verify();
}

template <class MAP_TYPE>
struct UnorderedMapGetter {
static inline const typename MAP_TYPE::key_type &GetKey(typename MAP_TYPE::iterator &iterator) {
return iterator->first;
}

static inline typename MAP_TYPE::mapped_type &GetValue(typename MAP_TYPE::iterator &iterator) {
return iterator->second;
}
};

template <class T>
struct FixedSizeMapGetter {
static inline const idx_t &GetKey(fixed_size_map_iterator_t<T> &iterator) {
return iterator.GetKey();
}

static inline T &GetValue(fixed_size_map_iterator_t<T> &iterator) {
return iterator.GetValue();
}
};

void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel,
const idx_t append_count) {
if (UseFixedSizeMap()) {
BuildPartitionSel<fixed_size_map_t<list_entry_t>>(state, state.fixed_partition_entries, append_sel,
append_count);
BuildPartitionSel<fixed_size_map_t<list_entry_t>, FixedSizeMapGetter<list_entry_t>>(
state, state.fixed_partition_entries, append_sel, append_count);
} else {
BuildPartitionSel<perfect_map_t<list_entry_t>>(state, state.partition_entries, append_sel, append_count);
BuildPartitionSel<perfect_map_t<list_entry_t>, UnorderedMapGetter<perfect_map_t<list_entry_t>>>(
state, state.partition_entries, append_sel, append_count);
}
}

template <class MAP_TYPE>
template <class MAP_TYPE, class GETTER>
void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries,
const SelectionVector &append_sel, const idx_t append_count) {
const auto partition_indices = FlatVector::GetData<idx_t>(state.partition_indices);
Expand All @@ -157,7 +180,7 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st
if (partition_entry == partition_entries.end()) {
partition_entries[partition_index] = list_entry_t(0, 1);
} else {
partition_entry->second.length++;
GETTER::GetValue(partition_entry).length++;
}
}
break;
Expand All @@ -180,8 +203,8 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st

// Compute offsets from the counts
idx_t offset = 0;
for (auto &pc : partition_entries) {
auto &partition_entry = pc.second;
for (auto it = partition_entries.begin(); it != partition_entries.end(); ++it) {
auto &partition_entry = GETTER::GetValue(it);
partition_entry.offset = offset;
offset += partition_entry.length;
}
Expand All @@ -200,23 +223,25 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st

void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state) {
if (UseFixedSizeMap()) {
BuildBufferSpace<fixed_size_map_t<list_entry_t>>(state, state.fixed_partition_entries);
BuildBufferSpace<fixed_size_map_t<list_entry_t>, FixedSizeMapGetter<list_entry_t>>(
state, state.fixed_partition_entries);
} else {
BuildBufferSpace<perfect_map_t<list_entry_t>>(state, state.partition_entries);
BuildBufferSpace<perfect_map_t<list_entry_t>, UnorderedMapGetter<perfect_map_t<list_entry_t>>>(
state, state.partition_entries);
}
}

template <class MAP_TYPE>
template <class MAP_TYPE, class GETTER>
void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries) {
for (auto &pc : partition_entries) {
const auto &partition_index = pc.first;
for (auto it = partition_entries.begin(); it != partition_entries.end(); ++it) {
const auto &partition_index = GETTER::GetKey(it);

// Partition, pin state for this partition index
auto &partition = *partitions[partition_index];
auto &partition_pin_state = *state.partition_pin_states[partition_index];

// Length and offset for this partition
const auto &partition_entry = pc.second;
const auto &partition_entry = GETTER::GetValue(it);
const auto &partition_length = partition_entry.length;
const auto partition_offset = partition_entry.offset - partition_length;

Expand Down
61 changes: 29 additions & 32 deletions src/common/types/row/tuple_data_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void TupleDataAllocator::Build(TupleDataSegment &segment, TupleDataPinState &pin
}

// Build the chunk parts for the incoming data
unsafe_vector<pair<idx_t, idx_t>> chunk_part_indices;
chunk_part_indices.clear();
idx_t offset = 0;
while (offset != append_count) {
if (chunks.empty() || chunks.back().count == STANDARD_VECTOR_SIZE) {
Expand All @@ -68,30 +68,26 @@ void TupleDataAllocator::Build(TupleDataSegment &segment, TupleDataPinState &pin

// Build the next part
auto next = MinValue<idx_t>(append_count - offset, STANDARD_VECTOR_SIZE - chunk.count);

auto part = BuildChunkPart(pin_state, chunk_state, append_offset + offset, next);
segment.data_size += part.count * layout.GetRowWidth();
if (!layout.AllConstant()) {
segment.data_size += part.total_heap_size;
}

chunk.AddPart(std::move(part), layout);
chunk_part_indices.emplace_back(chunks.size() - 1, chunk.parts.size() - 1);

chunk.AddPart(BuildChunkPart(pin_state, chunk_state, append_offset + offset, next), layout);
auto &chunk_part = chunk.parts.back();
next = chunk_part.count;

segment.count += next;
segment.data_size += chunk_part.count * layout.GetRowWidth();
if (!layout.AllConstant()) {
segment.data_size += chunk_part.total_heap_size;
}

offset += next;
chunk_part_indices.emplace_back(chunks.size() - 1, chunk.parts.size() - 1);
}

// Now initialize the pointers to write the data to
unsafe_vector<TupleDataChunkPart *> parts;
parts.reserve(chunk_part_indices.size());
chunk_parts.clear();
for (auto &indices : chunk_part_indices) {
parts.emplace_back(&segment.chunks[indices.first].parts[indices.second]);
chunk_parts.emplace_back(segment.chunks[indices.first].parts[indices.second]);
}
InitializeChunkStateInternal(pin_state, chunk_state, append_offset, false, true, false, parts);
InitializeChunkStateInternal(pin_state, chunk_state, append_offset, false, true, false, chunk_parts);

// To reduce metadata, we try to merge chunk parts where possible
// Due to the way chunk parts are constructed, only the last part of the first chunk is eligible for merging
Expand Down Expand Up @@ -183,10 +179,10 @@ void TupleDataAllocator::InitializeChunkState(TupleDataSegment &segment, TupleDa
// when chunk 0 needs heap block 0, chunk 1 does not need any heap blocks, and chunk 2 needs heap block 0 again
ReleaseOrStoreHandles(pin_state, segment, chunk, !chunk.heap_block_ids.empty());

unsafe_vector<TupleDataChunkPart *> parts;
unsafe_vector<reference<TupleDataChunkPart>> parts;
parts.reserve(chunk.parts.size());
for (auto &part : chunk.parts) {
parts.emplace_back(&part);
parts.emplace_back(part);
}

InitializeChunkStateInternal(pin_state, chunk_state, 0, true, init_heap, init_heap, parts);
Expand Down Expand Up @@ -214,17 +210,18 @@ static inline void InitializeHeapSizes(const data_ptr_t row_locations[], idx_t h
void TupleDataAllocator::InitializeChunkStateInternal(TupleDataPinState &pin_state, TupleDataChunkState &chunk_state,
idx_t offset, bool recompute, bool init_heap_pointers,
bool init_heap_sizes,
unsafe_vector<TupleDataChunkPart *> &parts) {
unsafe_vector<reference<TupleDataChunkPart>> &parts) {
auto row_locations = FlatVector::GetData<data_ptr_t>(chunk_state.row_locations);
auto heap_sizes = FlatVector::GetData<idx_t>(chunk_state.heap_sizes);
auto heap_locations = FlatVector::GetData<data_ptr_t>(chunk_state.heap_locations);

for (auto &part : parts) {
const auto next = part->count;
for (auto &part_ref : parts) {
auto &part = part_ref.get();
const auto next = part.count;

// Set up row locations for the scan
const auto row_width = layout.GetRowWidth();
const auto base_row_ptr = GetRowPointer(pin_state, *part);
const auto base_row_ptr = GetRowPointer(pin_state, part);
for (idx_t i = 0; i < next; i++) {
row_locations[offset + i] = base_row_ptr + i * row_width;
}
Expand All @@ -234,39 +231,39 @@ void TupleDataAllocator::InitializeChunkStateInternal(TupleDataPinState &pin_sta
continue;
}

if (part->total_heap_size == 0) {
if (part.total_heap_size == 0) {
if (init_heap_sizes) { // No heap, but we need the heap sizes
InitializeHeapSizes(row_locations, heap_sizes, offset, next, *part, layout.GetHeapSizeOffset());
InitializeHeapSizes(row_locations, heap_sizes, offset, next, part, layout.GetHeapSizeOffset());
}
offset += next;
continue;
}

// Check if heap block has changed - re-compute the pointers within each row if so
if (recompute && pin_state.properties != TupleDataPinProperties::ALREADY_PINNED) {
const auto new_base_heap_ptr = GetBaseHeapPointer(pin_state, *part);
if (part->base_heap_ptr != new_base_heap_ptr) {
lock_guard<mutex> guard(part->lock);
const auto old_base_heap_ptr = part->base_heap_ptr;
const auto new_base_heap_ptr = GetBaseHeapPointer(pin_state, part);
if (part.base_heap_ptr != new_base_heap_ptr) {
lock_guard<mutex> guard(part.lock);
const auto old_base_heap_ptr = part.base_heap_ptr;
if (old_base_heap_ptr != new_base_heap_ptr) {
Vector old_heap_ptrs(
Value::POINTER(CastPointerToValue(old_base_heap_ptr + part->heap_block_offset)));
Value::POINTER(CastPointerToValue(old_base_heap_ptr + part.heap_block_offset)));
Vector new_heap_ptrs(
Value::POINTER(CastPointerToValue(new_base_heap_ptr + part->heap_block_offset)));
Value::POINTER(CastPointerToValue(new_base_heap_ptr + part.heap_block_offset)));
RecomputeHeapPointers(old_heap_ptrs, *ConstantVector::ZeroSelectionVector(), row_locations,
new_heap_ptrs, offset, next, layout, 0);
part->base_heap_ptr = new_base_heap_ptr;
part.base_heap_ptr = new_base_heap_ptr;
}
}
}

if (init_heap_sizes) {
InitializeHeapSizes(row_locations, heap_sizes, offset, next, *part, layout.GetHeapSizeOffset());
InitializeHeapSizes(row_locations, heap_sizes, offset, next, part, layout.GetHeapSizeOffset());
}

if (init_heap_pointers) {
// Set the pointers where the heap data will be written (if needed)
heap_locations[offset] = part->base_heap_ptr + part->heap_block_offset;
heap_locations[offset] = part.base_heap_ptr + part.heap_block_offset;
for (idx_t i = 1; i < next; i++) {
auto idx = offset + i;
heap_locations[idx] = heap_locations[idx - 1] + heap_sizes[idx - 1];
Expand Down
2 changes: 1 addition & 1 deletion src/common/types/row/tuple_data_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void TupleDataCollection::Build(TupleDataPinState &pin_state, TupleDataChunkStat
const idx_t append_offset, const idx_t append_count) {
auto &segment = segments.back();
const auto size_before = segment.SizeInBytes();
segment.allocator->Build(segments.back(), pin_state, chunk_state, append_offset, append_count);
segment.allocator->Build(segment, pin_state, chunk_state, append_offset, append_count);
data_size += segment.SizeInBytes() - size_before;
count += append_count;
Verify();
Expand Down
1 change: 1 addition & 0 deletions src/common/types/row/tuple_data_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ TupleDataChunkPart &TupleDataChunkPart::operator=(TupleDataChunkPart &&other) no
}

TupleDataChunk::TupleDataChunk() : count(0) {
parts.reserve(2);
}

static inline void SwapTupleDataChunk(TupleDataChunk &a, TupleDataChunk &b) noexcept {
Expand Down
1 change: 1 addition & 0 deletions src/function/cast/cast_function_set.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

#include "duckdb/function/cast/cast_function_set.hpp"

#include "duckdb/common/pair.hpp"
Expand Down
39 changes: 17 additions & 22 deletions src/include/duckdb/common/fixed_size_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "duckdb/common/pair.hpp"
#include "duckdb/common/types.hpp"
#include "duckdb/common/types/validity_mask.hpp"

Expand All @@ -20,6 +21,11 @@ template <typename T>
class fixed_size_map_t {
friend struct fixed_size_map_iterator_t<T>;

public:
using key_type = idx_t;
using mapped_type = T;
using iterator = fixed_size_map_iterator_t<T>;

public:
explicit fixed_size_map_t(idx_t capacity_p = 0) : capacity(capacity_p) {
resize(capacity);
Expand All @@ -30,10 +36,9 @@ class fixed_size_map_t {
}

void resize(idx_t capacity_p) {
D_ASSERT(capacity_p % 8 == 0);
capacity = capacity_p;
occupied = ValidityMask(capacity);
values = make_unsafe_uniq_array<T>(capacity);
values = make_unsafe_uniq_array<T>(capacity + 1);
clear();
}

Expand Down Expand Up @@ -68,7 +73,7 @@ class fixed_size_map_t {
return fixed_size_map_iterator_t<T>(capacity, *this);
}

fixed_size_map_iterator_t<T> find(idx_t index) {
fixed_size_map_iterator_t<T> find(const idx_t &index) {
if (occupied.RowIsValid(index)) {
return fixed_size_map_iterator_t<T>(index, *this);
} else {
Expand All @@ -87,13 +92,12 @@ class fixed_size_map_t {
template <typename T>
struct fixed_size_map_iterator_t {
public:
fixed_size_map_iterator_t(idx_t index_p, fixed_size_map_t<T> &map_p) : map(map_p), current(index_p, map[0]) {
fixed_size_map_iterator_t(idx_t index_p, fixed_size_map_t<T> &map_p) : map(map_p), current(index_p) {
}

fixed_size_map_iterator_t<T> &operator++() {
for (current.first++; current.first < map.capacity; current.first++) {
if (map.occupied.RowIsValid(current.first)) {
current.second = map[index()];
for (current++; current < map.capacity; current++) {
if (map.occupied.RowIsValid(current)) {
break;
}
}
Expand All @@ -106,34 +110,25 @@ struct fixed_size_map_iterator_t {
return tmp;
}

std::pair<idx_t, T &> *operator->() {
return &current;
const idx_t &GetKey() const {
return current;
}

std::pair<idx_t, T &> &operator*() {
return current;
T &GetValue() {
return map.values[current];
}

friend bool operator==(const fixed_size_map_iterator_t<T> &a, const fixed_size_map_iterator_t<T> &b) {
return a.index() == b.index();
return a.current == b.current;
}

friend bool operator!=(const fixed_size_map_iterator_t<T> &a, const fixed_size_map_iterator_t<T> &b) {
return !(a == b);
}

private:
idx_t &index() {
return current.first;
}

const idx_t &index() const {
return current.first;
}

private:
fixed_size_map_t<T> &map;
std::pair<idx_t, T &> current;
idx_t current;
};

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ class PartitionedTupleData {
//! - returns true if everything belongs to the same partition - stores partition index in single_partition_idx
void BuildPartitionSel(PartitionedTupleDataAppendState &state, const SelectionVector &append_sel,
const idx_t append_count);
template <class MAP_TYPE>
template <class MAP_TYPE, class GETTER>
void BuildPartitionSel(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries,
const SelectionVector &append_sel, const idx_t append_count);
//! Builds out the buffer space in the partitions
void BuildBufferSpace(PartitionedTupleDataAppendState &state);
template <class MAP_TYPE>
template <class MAP_TYPE, class GETTER>
void BuildBufferSpace(PartitionedTupleDataAppendState &state, MAP_TYPE &partition_entries);
//! Create a collection for a specific a partition
unique_ptr<TupleDataCollection> CreatePartitionCollection(idx_t partition_index) const {
Expand Down
6 changes: 5 additions & 1 deletion src/include/duckdb/common/types/row/tuple_data_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TupleDataAllocator {
//! Internal function for InitializeChunkState
void InitializeChunkStateInternal(TupleDataPinState &pin_state, TupleDataChunkState &chunk_state, idx_t offset,
bool recompute, bool init_heap_pointers, bool init_heap_sizes,
unsafe_vector<TupleDataChunkPart *> &parts);
unsafe_vector<reference<TupleDataChunkPart>> &parts);
//! Internal function for ReleaseOrStoreHandles
static void ReleaseOrStoreHandlesInternal(TupleDataSegment &segment,
unsafe_vector<BufferHandle> &pinned_row_handles,
Expand All @@ -111,6 +111,10 @@ class TupleDataAllocator {
unsafe_vector<TupleDataBlock> row_blocks;
//! Blocks storing the variable-size data of the fixed-size rows (e.g., string, list)
unsafe_vector<TupleDataBlock> heap_blocks;

//! Re-usable arrays used while building buffer space
unsafe_vector<reference<TupleDataChunkPart>> chunk_parts;
unsafe_vector<pair<idx_t, idx_t>> chunk_part_indices;
};

} // namespace duckdb
Loading

0 comments on commit 7b8cca4

Please sign in to comment.