Skip to content

Commit

Permalink
Add support to serialize/deserialize SetAccumulators
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Jul 17, 2024
1 parent 86efb63 commit b8fd99e
Show file tree
Hide file tree
Showing 3 changed files with 542 additions and 15 deletions.
259 changes: 244 additions & 15 deletions velox/exec/SetAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#pragma once

#include <folly/container/F14Set.h>

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Strings.h"
Expand All @@ -29,6 +31,22 @@ namespace detail {

/// Maintains a set of unique values. Non-null values are stored in F14FastSet.
/// A separate flag tracks presence of the null value.
/// The SetAccumulator also tracks the order in which the values are added to
/// the accumulator (for ordered aggregations). So each value is associated with
/// an index of its position.

/// SetAccumulator supports serialization/deserialization to/from an array of
/// byte streams.
/// These are used in the spilling logic of operators using SetAccumulator.

/// The serialization format is :
/// i) The first element of the array is the index of the null value
/// (or -1 if no null value).
/// ii) The list of values (and optionally some metadata) in the order of
/// their positions. The null position (if one) is skipped.
/// iii) For scalar and string types, only the value is serialized in the
/// order of their indexes in the accumulator. For a complex type, a tuple
/// of (hash, value) is serialized.
template <
typename T,
typename Hash = std::hash<T>,
Expand Down Expand Up @@ -114,6 +132,26 @@ struct SetAccumulator {
}
}

/// Deserializes accumulator from previously serialized value.
void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* /*allocator*/) {
// The serialized value is the nullIndex (kNoNullIndex if no null is
// present) followed by the unique values ordered by index.
deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition = nullIndex.has_value() ? nullIndex.value() : size;
for (auto i = 1, j = 0; i < size; i++, j++) {
T value = *reinterpret_cast<const T*>(vector.valueAt(index + i).data());
auto pos = (j < nullPosition) ? j : i;
uniqueValues.insert({value, pos});
}
}

/// Returns number of unique values including null.
size_t size() const {
return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0);
Expand All @@ -134,10 +172,51 @@ struct SetAccumulator {
: uniqueValues.size();
}

/// Serializes a sequence of VARBINARY values starting at result[index].
/// This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = nullIndexSerializationValue();
flatResult->set(
index,
StringView(
reinterpret_cast<const char*>(&nullIndexValue),
sizeof(vector_size_t)));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition =
nullIndex.has_value() ? nullIndex.value() : uniqueValues.size();
const auto sizeOfT = sizeof(T);
for (const auto& value : uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
flatResult->set(
offset,
StringView(reinterpret_cast<const char*>(&value.first), sizeOfT));
}
}

void free(HashStringAllocator& allocator) {
using UT = decltype(uniqueValues);
uniqueValues.~UT();
}

void deserializeNullIndex(const char* input) {
VELOX_CHECK(!nullIndex.has_value());
auto streamNullIndex = *reinterpret_cast<const vector_size_t*>(input);
if (streamNullIndex != kNoNullIndex) {
nullIndex = streamNullIndex;
}
}

vector_size_t nullIndexSerializationValue() {
return nullIndex.has_value() ? nullIndex.value() : kNoNullIndex;
}

static const vector_size_t kNoNullIndex = -1;
};

/// Maintains a set of unique strings.
Expand All @@ -162,14 +241,7 @@ struct StringViewSetAccumulator {
}
} else {
auto value = decoded.valueAt<StringView>(index);
if (!value.isInline()) {
if (base.uniqueValues.contains(value)) {
return;
}
value = strings.append(value, *allocator);
}
base.uniqueValues.insert(
{value, base.nullIndex.has_value() ? cnt + 1 : cnt});
addValue(value, base.nullIndex.has_value() ? cnt + 1 : cnt, allocator);
}
}

Expand Down Expand Up @@ -218,6 +290,23 @@ struct StringViewSetAccumulator {
}
}

void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* allocator) {
base.deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition =
base.nullIndex.has_value() ? base.nullIndex.value() : size;
for (auto i = 1; i < size; i++) {
auto pos = i - 1 < nullPosition ? i - 1 : i;
addUniqueValue(vector.valueAt(index + i), pos, allocator);
}
}

size_t size() const {
return base.size();
}
Expand All @@ -228,11 +317,57 @@ struct StringViewSetAccumulator {
return base.extractValues(values, offset);
}

/// Serialize an array of VARBINARY representation starting from
/// result[index]. This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = base.nullIndexSerializationValue();
flatResult->set(
index, StringView((const char*)&nullIndexValue, sizeof(vector_size_t)));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition = base.nullIndex.has_value()
? base.nullIndex.value()
: base.uniqueValues.size();
for (const auto& value : base.uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
flatResult->set(offset, value.first);
}
}

void free(HashStringAllocator& allocator) {
strings.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
void addValue(
const StringView& value,
vector_size_t index,
HashStringAllocator* allocator) {
if (base.uniqueValues.contains(value)) {
return;
}

addUniqueValue(value, index, allocator);
}

void addUniqueValue(
const StringView& value,
vector_size_t index,
HashStringAllocator* allocator) {
VELOX_CHECK(!base.uniqueValues.contains(value));
StringView valueCopy = value;
if (!valueCopy.isInline()) {
valueCopy = strings.append(value, *allocator);
}
base.uniqueValues.insert({valueCopy, index});
}
};

/// Maintains a set of unique arrays, maps or structs.
Expand All @@ -247,6 +382,12 @@ struct ComplexTypeSetAccumulator {
/// Stores unique non-null values.
AddressableNonNullValueList values;

// Tracks the size of the biggest ComplexType in the set. This is used for
// allocating a temporary buffer during serialization.
size_t maxSize = 0;

static constexpr size_t kSizeOfHash = sizeof(uint64_t);

ComplexTypeSetAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{
AddressableNonNullValueList::Hash{},
Expand All @@ -263,13 +404,9 @@ struct ComplexTypeSetAccumulator {
base.nullIndex = cnt;
}
} else {
auto entry = values.append(decoded, index, allocator);

if (!base.uniqueValues
.insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt})
.second) {
values.removeLast(entry);
}
const auto entry = values.append(decoded, index, allocator);
const auto position = base.nullIndex.has_value() ? cnt + 1 : cnt;
addEntry(entry, position);
}
}

Expand Down Expand Up @@ -315,6 +452,30 @@ struct ComplexTypeSetAccumulator {
}
}

void deserialize(
const FlatVector<StringView>& vector,
vector_size_t index,
vector_size_t size,
HashStringAllocator* allocator) {
base.deserializeNullIndex(vector.valueAt(index).data());

// Mark the nullPosition beyond values to correctly offset when reading the
// stream.
const auto nullPosition =
base.nullIndex.has_value() ? base.nullIndex.value() : size;
for (auto i = 1; i < size; i++) {
auto value = vector.valueAt(index + i);
auto stream = common::InputByteStream(value.data());
auto hash = stream.read<uint64_t>();
auto length = value.size() - kSizeOfHash;
auto contents = StringView(stream.read<char>(length), length);
auto position = values.appendSerialized(contents, allocator);

auto pos = (i - 1 < nullPosition) ? i - 1 : i;
addEntry({position, contents.size(), hash}, pos);
}
}

size_t size() const {
return base.size();
}
Expand All @@ -332,11 +493,79 @@ struct ComplexTypeSetAccumulator {
return base.uniqueValues.size() + (base.nullIndex.has_value() ? 1 : 0);
}

/// Starting from result[index] append a sequence of VARBINARY values for
/// serialization for spilling..
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size());

auto nullIndexValue = base.nullIndexSerializationValue();
flatResult->set(
index, StringView((const char*)&nullIndexValue, sizeof(vector_size_t)));

// Temporary buffer used during serialization.
auto* tempBuffer =
(char*)(flatResult->pool()->allocate(maxSize + kSizeOfHash));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition = base.nullIndex.has_value()
? base.nullIndex.value()
: base.uniqueValues.size();
for (const auto& value : base.uniqueValues) {
auto pos = value.second;
auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1;
SerializationStream stream(tempBuffer, kSizeOfHash + value.first.size);
// Complex type hash.
stream.append(&value.first.hash, kSizeOfHash);
// Complex type value.
stream.append(value.first);
flatResult->set(
offset, StringView(tempBuffer, kSizeOfHash + value.first.size));
}

flatResult->pool()->free(tempBuffer, maxSize + kSizeOfHash);
}

void free(HashStringAllocator& allocator) {
values.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
// Simple stream abstraction for serialization logic. 'append' calls to concat
// values to the stream (for the input buffer) are exposed to the user.
struct SerializationStream {
char* rawBuffer;
const vector_size_t totalSize;
vector_size_t offset = 0;

SerializationStream(char* buffer, vector_size_t totalSize)
: rawBuffer(buffer), totalSize(totalSize) {}

void append(const void* value, vector_size_t size) {
VELOX_CHECK_LE(offset + size, totalSize);
memcpy(rawBuffer + offset, value, size);
offset += size;
}

void append(const AddressableNonNullValueList::Entry& entry) {
VELOX_CHECK_LE(offset + entry.size, totalSize);
AddressableNonNullValueList::readSerialized(entry, rawBuffer + offset);
offset += entry.size;
}
};

void addEntry(
const AddressableNonNullValueList::Entry& entry,
vector_size_t index) {
if (!base.uniqueValues.insert({entry, index}).second) {
values.removeLast(entry);
} else {
maxSize = maxSize < entry.size ? entry.size : maxSize;
}
}
};

template <typename T>
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_executable(
RoundRobinPartitionFunctionTest.cpp
RowContainerTest.cpp
RowNumberTest.cpp
SetAccumulatorTest.cpp
SortBufferTest.cpp
SpillerTest.cpp
SpillTest.cpp
Expand Down
Loading

0 comments on commit b8fd99e

Please sign in to comment.