Skip to content

Commit

Permalink
Add logic to serialize/deserialize SetAccumulators
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Feb 8, 2024
1 parent 2abff36 commit e2c8b17
Show file tree
Hide file tree
Showing 3 changed files with 538 additions and 17 deletions.
268 changes: 251 additions & 17 deletions velox/exec/SetAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#pragma once

#include <folly/container/F14Set.h>

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Strings.h"
Expand All @@ -29,6 +31,21 @@ namespace detail {

/// Maintains a set of unique values. Non-null values are stored in F14FastSet.
/// A separate flag tracks presence of the null value.
/// The SetAccumulator also tracks the order in which the values are added to
/// the accumulator (for ordered aggregations). So each value is associated with
/// an index of its position.

/// SetAccumulator supports serialization/deserialization to/from a bytestream.
/// These are used in the spilling logic of operators using SetAccumulator.

/// The serialization format is :
/// i) index of the null value (or -1 if no null value).
/// ii) The number of unique entries serialized.
/// iii) The values (and optionally some metadata) are then serialized.
/// For a scalar type, only the value is serialized in the order of their
/// indexes in the accumulator. For a string type, a tuple of string (index,
/// length, value) are serialized. For a complex type, a tuple of (index,
/// length, hash, value) are serialized.
template <
typename T,
typename Hash = std::hash<T>,
Expand Down Expand Up @@ -86,6 +103,28 @@ struct SetAccumulator {
}
}

/// Deserializes accumulator from previously serialized value.
void deserialize(
const StringView& serialized,
HashStringAllocator* /*allocator*/) {
// The serialized value is the nullIndex (kNoNullIndex if no null is
// present) followed by the unique values ordered by index.
common::InputByteStream stream(serialized.data());
deserializeNullIndex(stream);
auto numValues = stream.read<size_t>();

size_t i = 0;
const auto size = serialized.size();
while (stream.offset() < size) {
if (!isNullIndex(i)) {
uniqueValues.insert({stream.read<T>(), i});
}
i++;
}

VELOX_CHECK_EQ(numValues, uniqueValues.size());
}

/// Returns number of unique values including null.
size_t size() const {
return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0);
Expand All @@ -106,10 +145,59 @@ struct SetAccumulator {
: uniqueValues.size();
}

/// Extracts in result[index] a serialized VARBINARY for the Set Values.
/// This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
const size_t totalBytes =
kSizeOfVector + sizeof(size_t) + kSizeOfValue * uniqueValues.size();

auto* flatResult = result->as<FlatVector<StringView>>();
auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalBytes, true);

auto nullIndexValue = nullIndexSerializationValue();
memcpy(rawBuffer, &nullIndexValue, kSizeOfVector);

auto numValues = uniqueValues.size();
memcpy(rawBuffer + kSizeOfVector, &numValues, sizeof(size_t));

// The null position is skipped when serializing values, so setting an out
// of bound value for no null position.
const auto nullPosition =
nullIndex.has_value() ? nullIndex : uniqueValues.size();
for (const auto& value : uniqueValues) {
auto index = value.second;
auto offset = 2 * kSizeOfVector +
(index < nullPosition ? index : index - 1) * kSizeOfValue;
memcpy(rawBuffer + offset, &(value.first), kSizeOfValue);
}

flatResult->setNoCopy(index, StringView(rawBuffer, totalBytes));
}

void free(HashStringAllocator& allocator) {
using UT = decltype(uniqueValues);
uniqueValues.~UT();
}

void deserializeNullIndex(common::InputByteStream& stream) {
VELOX_CHECK(!nullIndex.has_value());
const auto streamNullIndex = stream.read<vector_size_t>();
if (streamNullIndex != kNoNullIndex) {
nullIndex = streamNullIndex;
}
}

inline bool isNullIndex(size_t i) {
return nullIndex.has_value() && i == nullIndex.value();
}

vector_size_t nullIndexSerializationValue() {
return nullIndex.has_value() ? nullIndex.value() : kNoNullIndex;
}

static const vector_size_t kNoNullIndex = -1;
static constexpr size_t kSizeOfVector = sizeof(vector_size_t);
static constexpr size_t kSizeOfValue = sizeof(T);
};

/// Maintains a set of unique strings.
Expand All @@ -120,6 +208,13 @@ struct StringViewSetAccumulator {
/// Stores unique non-null non-inline strings.
Strings strings;

/// Size (in bytes) of the serialized string values (this includes inline and
/// non-inline) strings. This value also includes the bytes for serializing
/// the length and index values (2 * base.kSizeOfVector) of the strings.
/// Used for computing serialized buffer size for spilling.
/// It is initialized for the size of nullIndex and number of unique values.
size_t stringSetBytes = base.kSizeOfVector + sizeof(size_t);

StringViewSetAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{type, allocator} {}

Expand All @@ -134,14 +229,7 @@ struct StringViewSetAccumulator {
}
} else {
auto value = decoded.valueAt<StringView>(index);
if (!value.isInline()) {
if (base.uniqueValues.contains(value)) {
return;
}
value = strings.append(value, *allocator);
}
base.uniqueValues.insert(
{value, base.nullIndex.has_value() ? cnt + 1 : cnt});
addValue(value, base.nullIndex.has_value() ? cnt + 1 : cnt, allocator);
}
}

Expand All @@ -158,6 +246,23 @@ struct StringViewSetAccumulator {
}
}

void deserialize(
const StringView& serialized,
HashStringAllocator* allocator) {
common::InputByteStream stream(serialized.data());
const auto size = serialized.size();
base.deserializeNullIndex(stream);
auto numValues = stream.read<size_t>();

while (stream.offset() < size) {
auto index = stream.read<vector_size_t>();
auto length = stream.read<vector_size_t>();
addValue(StringView(stream.read<char>(length), length), index, allocator);
}

VELOX_CHECK_EQ(numValues, base.uniqueValues.size());
}

size_t size() const {
return base.size();
}
Expand All @@ -168,11 +273,53 @@ struct StringViewSetAccumulator {
return base.extractValues(values, offset);
}

/// Extracts in result[index] a serialized VARBINARY for the String Values.
/// This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
auto* rawBuffer =
flatResult->getRawStringBufferWithSpace(stringSetBytes, true);
common::OutputByteStream stream(rawBuffer);
auto nullIndexValue = base.nullIndexSerializationValue();
stream.appendOne(nullIndexValue);
stream.appendOne(base.uniqueValues.size());

for (const auto& value : base.uniqueValues) {
// Index.
stream.appendOne(value.second);
// String length.
vector_size_t length = value.first.size();
stream.appendOne(length);
// String value.
stream.append(value.first.data(), length);
}

flatResult->setNoCopy(index, StringView(rawBuffer, stringSetBytes));
}

void free(HashStringAllocator& allocator) {
strings.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
void addValue(
const StringView& value,
vector_size_t index,
HashStringAllocator* allocator) {
if (base.uniqueValues.contains(value)) {
return;
}
StringView valueCopy = value;
if (!valueCopy.isInline()) {
valueCopy = strings.append(value, *allocator);
}

base.uniqueValues.insert({valueCopy, index});
// Accounts for serializing the index and length of the string as well.
stringSetBytes += 2 * base.kSizeOfVector + valueCopy.size();
}
};

/// Maintains a set of unique arrays, maps or structs.
Expand All @@ -187,6 +334,13 @@ struct ComplexTypeSetAccumulator {
/// Stores unique non-null values.
AddressableNonNullValueList values;

/// Tracks allocated bytes for sizing during serialization for spill.
/// Initialized to account for the serialization of the null index and number
/// of unique values.
size_t totalSize = base.kSizeOfVector + sizeof(size_t);

static constexpr size_t kSizeOfHash = sizeof(uint64_t);

ComplexTypeSetAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: base{
AddressableNonNullValueList::Hash{},
Expand All @@ -195,21 +349,17 @@ struct ComplexTypeSetAccumulator {

void addValue(
const DecodedVector& decoded,
vector_size_t index,
vector_size_t i,
HashStringAllocator* allocator) {
const auto cnt = base.uniqueValues.size();
if (decoded.isNullAt(index)) {
if (decoded.isNullAt(i)) {
if (!base.nullIndex.has_value()) {
base.nullIndex = cnt;
}
} else {
auto entry = values.append(decoded, index, allocator);

if (!base.uniqueValues
.insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt})
.second) {
values.removeLast(entry);
}
const auto entry = values.append(decoded, i, allocator);
const auto index = base.nullIndex.has_value() ? cnt + 1 : cnt;
addEntry(entry, index);
}
}

Expand All @@ -226,6 +376,26 @@ struct ComplexTypeSetAccumulator {
}
}

void deserialize(
const StringView& serialized,
HashStringAllocator* allocator) {
auto stream = common::InputByteStream(serialized.data());
base.deserializeNullIndex(stream);
auto numValues = stream.read<size_t>();

const auto size = serialized.size();
while (stream.offset() < size) {
auto index = stream.read<vector_size_t>();
auto length = stream.read<vector_size_t>();
auto hash = stream.read<uint64_t>();
auto contents = StringView(stream.read<char>(length), length);
auto position = values.appendSerialized(contents, allocator);
addEntry({position, contents.size(), hash}, index);
}

VELOX_CHECK_EQ(numValues, base.uniqueValues.size());
}

size_t size() const {
return base.size();
}
Expand All @@ -243,11 +413,75 @@ struct ComplexTypeSetAccumulator {
return base.uniqueValues.size() + (base.nullIndex.has_value() ? 1 : 0);
}

/// Extracts in result[index] a serialized VARBINARY for the String Values.
/// This is used for the spill of this accumulator.
void serialize(const VectorPtr& result, vector_size_t index) {
auto* flatResult = result->as<FlatVector<StringView>>();
auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalSize, true);

SerializationStream stream(rawBuffer, totalSize);

auto nullIndexValue = base.nullIndexSerializationValue();
stream.append(&nullIndexValue, base.kSizeOfVector);

auto numValues = base.uniqueValues.size();
stream.append(&numValues, sizeof(size_t));

for (const auto& value : base.uniqueValues) {
// Index.
stream.append(&value.second, base.kSizeOfVector);
// Complex type Length.
stream.append(&value.first.size, base.kSizeOfVector);
// Complex type hash.
stream.append(&value.first.hash, kSizeOfHash);
// Complex type value.
stream.append(value.first);
}

flatResult->setNoCopy(index, StringView(rawBuffer, totalSize));
}

void free(HashStringAllocator& allocator) {
values.free(allocator);
using Base = decltype(base);
base.~Base();
}

private:
// Simple stream abstraction for serialization logic. 'append' calls to concat
// values to the stream for the input buffer are exposed to the user.
struct SerializationStream {
char* rawBuffer;
const vector_size_t totalSize;
vector_size_t offset = 0;

SerializationStream(char* buffer, vector_size_t totalSize)
: rawBuffer(buffer), totalSize(totalSize) {}

void append(const void* value, vector_size_t size) {
VELOX_CHECK_LE(offset + size, totalSize);
memcpy(rawBuffer + offset, value, size);
offset += size;
}

void append(const AddressableNonNullValueList::Entry& entry) {
VELOX_CHECK_LE(offset + entry.size, totalSize);
AddressableNonNullValueList::readSerialized(entry, rawBuffer + offset);
offset += entry.size;
}
};

void addEntry(
const AddressableNonNullValueList::Entry& entry,
vector_size_t index) {
if (!base.uniqueValues.insert({entry, index}).second) {
values.removeLast(entry);
} else {
// Accounts for the length of the complex type along with its size and
// hash.
totalSize += 2 * base.kSizeOfVector + kSizeOfHash + entry.size;
}
}
};

template <typename T>
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_executable(
RowContainerTest.cpp
RowNumberTest.cpp
MarkDistinctTest.cpp
SetAccumulatorTest.cpp
SharedArbitratorTest.cpp
SpillTest.cpp
SpillOperatorGroupTest.cpp
Expand Down
Loading

0 comments on commit e2c8b17

Please sign in to comment.