Skip to content

Commit

Permalink
Add new APIs to AddressableNonNullValueList to copy/append a stream o…
Browse files Browse the repository at this point in the history
…f bytes (facebookincubator#8653)

Summary:
This is the first in a set of PRs to add support for spilling distinct aggregations (see full version in facebookincubator#7791).

Spilling distinct aggregations needs support to spill SetAccumulators in which input values are cumulated. ComplexTypeSetAccumulators use AddressableNonNullValueList to serialize complex types. This PR adds new APIs to AddressableNonNullValueList so that it can copy/append a stream of bytes corresponding to a ComplexType value (array, map, struct).

Pull Request resolved: facebookincubator#8653

Reviewed By: Yuhta

Differential Revision: D53497000

Pulled By: mbasmanova

fbshipit-source-id: 66d44d02a2c3bd5775725c8b8559feaed17c0813
  • Loading branch information
aditi-pandit authored and FelixYBW committed Feb 10, 2024
1 parent eac4772 commit 7cf983d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 13 deletions.
4 changes: 4 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class ByteOutputStream {

void operator=(const ByteOutputStream& other) = delete;

// Forcing a move constructor to be able to return ByteOutputStream objects
// from a function.
ByteOutputStream(ByteOutputStream&&) = default;

/// Sets 'this' to range over 'range'. If this is for purposes of writing,
/// lastWrittenPosition specifies the end of any pre-existing content in
/// 'range'.
Expand Down
38 changes: 33 additions & 5 deletions velox/exec/AddressableNonNullValueList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

namespace facebook::velox::aggregate::prestosql {

AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
const DecodedVector& decoded,
vector_size_t index,
ByteOutputStream AddressableNonNullValueList::initStream(
HashStringAllocator* allocator) {
ByteOutputStream stream(allocator);
if (!firstHeader_) {
Expand All @@ -30,21 +28,28 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
// and a next pointer. This could be adaptive, with smaller initial
// sizes for lots of small arrays.
static constexpr int kInitialSize = 44;

currentPosition_ = allocator->newWrite(stream, kInitialSize);
firstHeader_ = currentPosition_.header;
} else {
allocator->extendWrite(currentPosition_, stream);
}

return stream;
}

AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator) {
auto stream = initStream(allocator);

const auto hash = decoded.base()->hashValueAt(decoded.index(index));

const auto originalSize = stream.size();

// Write value.
exec::ContainerRowSerde::serialize(
*decoded.base(), decoded.index(index), stream);

++size_;

auto startAndFinish = allocator->finishWrite(stream, 1024);
Expand All @@ -55,6 +60,21 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
return {startAndFinish.first, writtenSize, hash};
}

HashStringAllocator::Position AddressableNonNullValueList::appendSerialized(
const StringView& value,
HashStringAllocator* allocator) {
auto stream = initStream(allocator);

const auto originalSize = stream.size();
stream.appendStringView(value);
++size_;

auto startAndFinish = allocator->finishWrite(stream, 1024);
currentPosition_ = startAndFinish.second;
VELOX_CHECK_EQ(stream.size() - originalSize, value.size());
return {startAndFinish.first};
}

namespace {

ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) {
Expand Down Expand Up @@ -94,4 +114,12 @@ void AddressableNonNullValueList::read(
exec::ContainerRowSerde::deserialize(stream, index, &result);
}

// static
void AddressableNonNullValueList::readSerialized(
const Entry& position,
char* dest) {
auto stream = prepareRead(position);
stream.readBytes(dest, position.size);
}

} // namespace facebook::velox::aggregate::prestosql
12 changes: 12 additions & 0 deletions velox/exec/AddressableNonNullValueList.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/vector/DecodedVector.h"

Expand Down Expand Up @@ -57,6 +58,12 @@ class AddressableNonNullValueList {
vector_size_t index,
HashStringAllocator* allocator);

/// Append a non-null serialized value to the end of the list.
/// Returns position that can be used to access the value later.
HashStringAllocator::Position appendSerialized(
const StringView& value,
HashStringAllocator* allocator);

/// Removes last element. 'position' must be a value returned from the latest
/// call to 'append'.
void removeLast(const Entry& entry) {
Expand All @@ -77,13 +84,18 @@ class AddressableNonNullValueList {
static void
read(const Entry& position, BaseVector& result, vector_size_t index);

/// Copies to 'dest' entry.size bytes at position.
static void readSerialized(const Entry& position, char* dest);

void free(HashStringAllocator& allocator) {
if (size_ > 0) {
allocator.free(firstHeader_);
}
}

private:
ByteOutputStream initStream(HashStringAllocator* allocator);

// Memory allocation (potentially multi-part).
HashStringAllocator::Header* firstHeader_{nullptr};
HashStringAllocator::Position currentPosition_{nullptr, nullptr};
Expand Down
90 changes: 82 additions & 8 deletions velox/exec/tests/AddressableNonNullValueListTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/AddressableNonNullValueList.h"
#include <gtest/gtest.h>
#include "velox/common/base/IOUtils.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

namespace facebook::velox::aggregate::prestosql {
Expand All @@ -28,14 +29,17 @@ class AddressableNonNullValueListTest : public testing::Test,
memory::MemoryManager::testingSetInstance({});
}

void test(const VectorPtr& data, const VectorPtr& uniqueData) {
using T = AddressableNonNullValueList::Entry;
using Set = folly::F14FastSet<
T,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<T, 16>>;
using T = AddressableNonNullValueList::Entry;
using Set = folly::F14FastSet<
T,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<T, 16>>;

static constexpr size_t kSizeOfHash = sizeof(uint64_t);
static constexpr size_t kSizeOfLength = sizeof(vector_size_t);

void test(const VectorPtr& data, const VectorPtr& uniqueData) {
Set uniqueValues{
0,
AddressableNonNullValueList::Hash{},
Expand All @@ -46,6 +50,10 @@ class AddressableNonNullValueListTest : public testing::Test,

std::vector<T> entries;

// Tracks the number of bytes for serializing the
// AddressableNonNullValueList.
vector_size_t totalSize = 0;

DecodedVector decodedVector(*data);
for (auto i = 0; i < data->size(); ++i) {
auto entry = values.append(decodedVector, i, allocator());
Expand All @@ -56,6 +64,9 @@ class AddressableNonNullValueListTest : public testing::Test,
}

entries.push_back(entry);
// The total size for serialization is
// (size of length + size of hash + actual value size) for each entry.
totalSize += entry.size + kSizeOfHash + kSizeOfLength;

ASSERT_TRUE(uniqueValues.insert(entry).second);
ASSERT_TRUE(uniqueValues.contains(entry));
Expand All @@ -65,7 +76,19 @@ class AddressableNonNullValueListTest : public testing::Test,
ASSERT_EQ(uniqueData->size(), values.size());
ASSERT_EQ(uniqueData->size(), uniqueValues.size());

auto copy = BaseVector::create(data->type(), uniqueData->size(), pool());
testDirectRead(entries, uniqueValues, uniqueData);
testSerialization(entries, totalSize, uniqueData);
}

// Test direct read from AddressableNonNullValueList.
// Reads AddressableNonNullValueList into a vector, and validates its
// content.
void testDirectRead(
const std::vector<T>& entries,
const Set& uniqueValues,
const VectorPtr& uniqueData) {
auto copy =
BaseVector::create(uniqueData->type(), uniqueData->size(), pool());
for (auto i = 0; i < entries.size(); ++i) {
auto entry = entries[i];
ASSERT_TRUE(uniqueValues.contains(entry));
Expand All @@ -75,6 +98,57 @@ class AddressableNonNullValueListTest : public testing::Test,
test::assertEqualVectors(uniqueData, copy);
}

// Test copy/appendSerialized round-trip for AddressableNonNullValueList.
// Steps in the test:
// i) Copy entry length, hash and value of each entry to a stream.
// ii) Deserialize stream to a new set of entries.
// iii) Read deserialized entries back into a vector.
// iv) Validate the result vector.
void testSerialization(
const std::vector<T>& entries,
vector_size_t totalSize,
const VectorPtr& uniqueData) {
size_t offset = 0;
auto buffer = AlignedBuffer::allocate<char>(totalSize, pool());
auto* rawBuffer = buffer->asMutable<char>();

auto append = [&](const void* value, size_t size) {
memcpy((void*)(rawBuffer + offset), value, size);
offset += size;
};

for (const auto& entry : entries) {
append(&entry.size, kSizeOfLength);
append(&entry.hash, kSizeOfHash);
AddressableNonNullValueList::readSerialized(
entry, (char*)(rawBuffer + offset));
offset += entry.size;
}
ASSERT_EQ(offset, totalSize);

// Deserialize entries from the stream.
AddressableNonNullValueList deserialized;
std::vector<T> deserializedEntries;
common::InputByteStream stream(rawBuffer);
while (stream.offset() < totalSize) {
auto length = stream.read<vector_size_t>();
auto hash = stream.read<uint64_t>();
StringView contents(stream.read<char>(length), length);
auto position = deserialized.appendSerialized(contents, allocator());
deserializedEntries.push_back({position, contents.size(), hash});
}

// Direct read from deserialized AddressableNonNullValueList. Validate the
// results.
auto deserializedCopy =
BaseVector::create(uniqueData->type(), uniqueData->size(), pool());
for (auto i = 0; i < deserializedEntries.size(); ++i) {
auto entry = deserializedEntries[i];
AddressableNonNullValueList::read(entry, *deserializedCopy, i);
}
test::assertEqualVectors(uniqueData, deserializedCopy);
}

HashStringAllocator* allocator() {
return allocator_.get();
}
Expand Down

0 comments on commit 7cf983d

Please sign in to comment.