Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic to serialize/deserialize SetAccumulators #8660

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 244 additions & 15 deletions velox/exec/SetAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#pragma once

#include <folly/container/F14Set.h>

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Strings.h"
Expand All @@ -29,6 +31,22 @@ namespace detail {

/// Maintains a set of unique values. Non-null values are stored in F14FastSet.
/// A separate flag tracks presence of the null value.
/// The SetAccumulator also tracks the order in which the values are added to
/// the accumulator (for ordered aggregations). So each value is associated with
/// an index of its position.

/// SetAccumulator supports serialization/deserialization to/from an array of
/// byte streams.
/// These are used in the spilling logic of operators using SetAccumulator.

/// The serialization format is :
/// i) The first element of the array is the index of the null value
/// (or -1 if no null value).
/// ii) The list of values (and optionally some metadata) in the order of
/// their positions. The null position (if one) is skipped.
/// iii) For scalar and string types, only the value is serialized in the
/// order of their indexes in the accumulator. For a complex type, a tuple
/// of (hash, value) is serialized.
template <
typename T,
typename Hash = std::hash<T>,
Expand Down Expand Up @@ -114,6 +132,26 @@ struct SetAccumulator {
}
}

/// Deserializes accumulator from previously serialized value.
void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* /*allocator*/) {
// The serialized value is the nullIndex (kNoNullIndex if no null is
// present) followed by the unique values ordered by index.
deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition = nullIndex.has_value() ? nullIndex.value() : size;
for (auto i = 1, j = 0; i < size; i++, j++) {
T value = *reinterpret_cast<const T*>(vector.valueAt(index + i).data());
auto pos = (j < nullPosition) ? j : i;
uniqueValues.insert({value, pos});
}
}

/// Returns number of unique values including null.
size_t size() const {
return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0);
Expand All @@ -134,10 +172,51 @@ struct SetAccumulator {
: uniqueValues.size();
}

/// Serializes a sequence of VARBINARY values starting at result[index].
/// This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = nullIndexSerializationValue();
flatResult->set(
index,
StringView(
reinterpret_cast<const char*>(&nullIndexValue),
sizeof(vector_size_t)));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition =
nullIndex.has_value() ? nullIndex.value() : uniqueValues.size();
const auto sizeOfT = sizeof(T);
for (const auto& value : uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
flatResult->set(
offset,
StringView(reinterpret_cast<const char*>(&value.first), sizeOfT));
}
}

void free(HashStringAllocator& allocator) {
using UT = decltype(uniqueValues);
uniqueValues.~UT();
}

void deserializeNullIndex(const char* input) {
VELOX_CHECK(!nullIndex.has_value());
auto streamNullIndex = *reinterpret_cast<const vector_size_t*>(input);
if (streamNullIndex != kNoNullIndex) {
nullIndex = streamNullIndex;
}
}

vector_size_t nullIndexSerializationValue() {
return nullIndex.has_value() ? nullIndex.value() : kNoNullIndex;
}

static const vector_size_t kNoNullIndex = -1;
};

/// Maintains a set of unique strings.
Expand All @@ -162,14 +241,7 @@ struct StringViewSetAccumulator {
}
} else {
auto value = decoded.valueAt<StringView>(index);
if (!value.isInline()) {
if (base.uniqueValues.contains(value)) {
return;
}
value = strings.append(value, *allocator);
}
base.uniqueValues.insert(
{value, base.nullIndex.has_value() ? cnt + 1 : cnt});
addValue(value, base.nullIndex.has_value() ? cnt + 1 : cnt, allocator);
}
}

Expand Down Expand Up @@ -218,6 +290,23 @@ struct StringViewSetAccumulator {
}
}

void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* allocator) {
base.deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition =
base.nullIndex.has_value() ? base.nullIndex.value() : size;
for (auto i = 1; i < size; i++) {
auto pos = i - 1 < nullPosition ? i - 1 : i;
addUniqueValue(vector.valueAt(index + i), pos, allocator);
}
}

size_t size() const {
return base.size();
}
Expand All @@ -228,11 +317,57 @@ struct StringViewSetAccumulator {
return base.extractValues(values, offset);
}

/// Serialize an array of VARBINARY representation starting from
/// result[index]. This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = base.nullIndexSerializationValue();
flatResult->set(
index, StringView((const char*)&nullIndexValue, sizeof(vector_size_t)));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition = base.nullIndex.has_value()
? base.nullIndex.value()
: base.uniqueValues.size();
for (const auto& value : base.uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
flatResult->set(offset, value.first);
}
}

void free(HashStringAllocator& allocator) {
strings.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
void addValue(
const StringView& value,
vector_size_t index,
HashStringAllocator* allocator) {
if (base.uniqueValues.contains(value)) {
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
return;
}

addUniqueValue(value, index, allocator);
}

void addUniqueValue(
const StringView& value,
vector_size_t index,
HashStringAllocator* allocator) {
VELOX_CHECK(!base.uniqueValues.contains(value));
StringView valueCopy = value;
if (!valueCopy.isInline()) {
valueCopy = strings.append(value, *allocator);
}
base.uniqueValues.insert({valueCopy, index});
}
};

/// Maintains a set of unique arrays, maps or structs.
Expand All @@ -247,6 +382,12 @@ struct ComplexTypeSetAccumulator {
/// Stores unique non-null values.
AddressableNonNullValueList values;

// Tracks the size of the biggest ComplexType in the set. This is used for
// allocating a temporary buffer during serialization.
size_t maxSize = 0;

static constexpr size_t kSizeOfHash = sizeof(uint64_t);

ComplexTypeSetAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{
AddressableNonNullValueList::Hash{},
Expand All @@ -263,13 +404,9 @@ struct ComplexTypeSetAccumulator {
base.nullIndex = cnt;
}
} else {
auto entry = values.append(decoded, index, allocator);

if (!base.uniqueValues
.insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt})
.second) {
values.removeLast(entry);
}
const auto entry = values.append(decoded, index, allocator);
const auto position = base.nullIndex.has_value() ? cnt + 1 : cnt;
addEntry(entry, position);
}
}

Expand Down Expand Up @@ -315,6 +452,30 @@ struct ComplexTypeSetAccumulator {
}
}

void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* allocator) {
base.deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition =
base.nullIndex.has_value() ? base.nullIndex.value() : size;
for (auto i = 1; i < size; i++) {
auto value = vector.valueAt(index + i);
auto stream = common::InputByteStream(value.data());
auto hash = stream.read<uint64_t>();
auto length = value.size() - kSizeOfHash;
auto contents = StringView(stream.read<char>(length), length);
auto position = values.appendSerialized(contents, allocator);

auto pos = (i - 1 < nullPosition) ? i - 1 : i;
addEntry({position, contents.size(), hash}, pos);
}
}

size_t size() const {
return base.size();
}
Expand All @@ -332,11 +493,79 @@ struct ComplexTypeSetAccumulator {
return base.uniqueValues.size() + (base.nullIndex.has_value() ? 1 : 0);
}

/// Starting from result[index] append a sequence of VARBINARY values for
/// serialization for spilling..
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = base.nullIndexSerializationValue();
flatResult->set(
index, StringView((const char*)&nullIndexValue, sizeof(vector_size_t)));

// Temporary buffer used during serialization.
auto* tempBuffer =
(char*)(flatResult->pool()->allocate(maxSize + kSizeOfHash));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition = base.nullIndex.has_value()
? base.nullIndex.value()
: base.uniqueValues.size();
for (const auto& value : base.uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
SerializationStream stream(tempBuffer, kSizeOfHash + value.first.size);
// Complex type hash.
stream.append(&value.first.hash, kSizeOfHash);
// Complex type value.
stream.append(value.first);
flatResult->set(
offset, StringView(tempBuffer, kSizeOfHash + value.first.size));
}

flatResult->pool()->free(tempBuffer, maxSize + kSizeOfHash);
}

void free(HashStringAllocator& allocator) {
values.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
// Simple stream abstraction for serialization logic. 'append' calls to concat
// values to the stream (for the input buffer) are exposed to the user.
struct SerializationStream {
char* rawBuffer;
const vector_size_t totalSize;
vector_size_t offset = 0;

SerializationStream(char* buffer, vector_size_t totalSize)
: rawBuffer(buffer), totalSize(totalSize) {}

void append(const void* value, vector_size_t size) {
VELOX_CHECK_LE(offset + size, totalSize);
memcpy(rawBuffer + offset, value, size);
offset += size;
}

void append(const AddressableNonNullValueList::Entry& entry) {
VELOX_CHECK_LE(offset + entry.size, totalSize);
AddressableNonNullValueList::readSerialized(entry, rawBuffer + offset);
offset += entry.size;
}
};

void addEntry(
const AddressableNonNullValueList::Entry& entry,
vector_size_t index) {
if (!base.uniqueValues.insert({entry, index}).second) {
values.removeLast(entry);
} else {
maxSize = maxSize < entry.size ? entry.size : maxSize;
}
}
};

template <typename T>
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ add_executable(
RoundRobinPartitionFunctionTest.cpp
RowContainerTest.cpp
RowNumberTest.cpp
SetAccumulatorTest.cpp
SortBufferTest.cpp
SpillerTest.cpp
SpillTest.cpp
Expand Down
Loading
Loading