Skip to content

Commit

Permalink
Add new AddressableNonNullValueList APIs to copy/append a stream of b…
Browse files Browse the repository at this point in the history
…ytes
  • Loading branch information
aditi-pandit committed Feb 6, 2024
1 parent fcd1699 commit 6705014
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 12 deletions.
2 changes: 2 additions & 0 deletions velox/common/base/IOUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <cstdint>
#include <cstring>

#include "velox/common/memory/HashStringAllocator.h"

namespace facebook::velox::common {
struct OutputByteStream {
explicit OutputByteStream(char* data, int32_t offset = 0)
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class ByteOutputStream {

void operator=(const ByteOutputStream& other) = delete;

// Forcing a move constructor to be able to return ByteOutputStream objects
// from a function.
ByteOutputStream(ByteOutputStream&&) = default;

/// Sets 'this' to range over 'range'. If this is for purposes of writing,
/// lastWrittenPosition specifies the end of any pre-existing content in
/// 'range'.
Expand Down
42 changes: 37 additions & 5 deletions velox/exec/AddressableNonNullValueList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

namespace facebook::velox::aggregate::prestosql {

AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
const DecodedVector& decoded,
vector_size_t index,
ByteOutputStream AddressableNonNullValueList::initStream(
HashStringAllocator* allocator) {
ByteOutputStream stream(allocator);
if (!firstHeader_) {
Expand All @@ -30,21 +28,28 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
// and a next pointer. This could be adaptive, with smaller initial
// sizes for lots of small arrays.
static constexpr int kInitialSize = 44;

currentPosition_ = allocator->newWrite(stream, kInitialSize);
firstHeader_ = currentPosition_.header;
} else {
allocator->extendWrite(currentPosition_, stream);
}

return stream;
}

AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator) {
auto stream = initStream(allocator);

const auto hash = decoded.base()->hashValueAt(decoded.index(index));

const auto originalSize = stream.size();

// Write value.
exec::ContainerRowSerde::serialize(
*decoded.base(), decoded.index(index), stream);

++size_;

auto startAndFinish = allocator->finishWrite(stream, 1024);
Expand All @@ -55,6 +60,25 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append(
return {startAndFinish.first, writtenSize, hash};
}

std::pair<HashStringAllocator::Position, size_t>
AddressableNonNullValueList::appendSerialized(
const StringView& value,
HashStringAllocator* allocator) {
auto stream = initStream(allocator);

const auto originalSize = stream.size();
stream.appendStringView(value);
++size_;

auto startAndFinish = allocator->finishWrite(stream, 1024);
currentPosition_ = startAndFinish.second;

const auto writtenSize = stream.size() - originalSize;
VELOX_CHECK_EQ(writtenSize, value.size());

return {startAndFinish.first, writtenSize};
}

namespace {

ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) {
Expand Down Expand Up @@ -94,4 +118,12 @@ void AddressableNonNullValueList::read(
exec::ContainerRowSerde::deserialize(stream, index, &result);
}

// static
void AddressableNonNullValueList::readSerialized(
const Entry& position,
char* dest) {
auto stream = prepareRead(position);
stream.readBytes(dest, position.size);
}

} // namespace facebook::velox::aggregate::prestosql
13 changes: 13 additions & 0 deletions velox/exec/AddressableNonNullValueList.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/vector/DecodedVector.h"

Expand Down Expand Up @@ -57,6 +58,13 @@ class AddressableNonNullValueList {
vector_size_t index,
HashStringAllocator* allocator);

/// Append a non-null serialized value to the end of the list.
/// Returns (position, size) that can be used to access the
/// value later.
std::pair<HashStringAllocator::Position, size_t> appendSerialized(
const StringView& value,
HashStringAllocator* allocator);

/// Removes last element. 'position' must be a value returned from the latest
/// call to 'append'.
void removeLast(const Entry& entry) {
Expand All @@ -77,13 +85,18 @@ class AddressableNonNullValueList {
static void
read(const Entry& position, BaseVector& result, vector_size_t index);

/// Copies to 'dest' entry.size bytes at position.
static void readSerialized(const Entry& position, char* dest);

void free(HashStringAllocator& allocator) {
if (size_ > 0) {
allocator.free(firstHeader_);
}
}

private:
ByteOutputStream initStream(HashStringAllocator* allocator);

// Memory allocation (potentially multi-part).
HashStringAllocator::Header* firstHeader_{nullptr};
HashStringAllocator::Position currentPosition_{nullptr, nullptr};
Expand Down
89 changes: 82 additions & 7 deletions velox/exec/tests/AddressableNonNullValueListTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/AddressableNonNullValueList.h"
#include <gtest/gtest.h>
#include "velox/common/base/IOUtils.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

namespace facebook::velox::aggregate::prestosql {
Expand All @@ -28,13 +29,17 @@ class AddressableNonNullValueListTest : public testing::Test,
memory::MemoryManager::testingSetInstance({});
}

using T = AddressableNonNullValueList::Entry;
using Set = folly::F14FastSet<
T,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<T, 16>>;

static constexpr size_t kSizeOfHash = sizeof(uint64_t);
static constexpr size_t kSizeOfLength = sizeof(vector_size_t);

void test(const VectorPtr& data, const VectorPtr& uniqueData) {
using T = AddressableNonNullValueList::Entry;
using Set = folly::F14FastSet<
T,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<T, 16>>;

Set uniqueValues{
0,
Expand All @@ -46,6 +51,10 @@ class AddressableNonNullValueListTest : public testing::Test,

std::vector<T> entries;

// Tracks the number of bytes for serializing the
// AddressableNonNullValueList.
vector_size_t totalSize = 0;

DecodedVector decodedVector(*data);
for (auto i = 0; i < data->size(); ++i) {
auto entry = values.append(decodedVector, i, allocator());
Expand All @@ -56,6 +65,9 @@ class AddressableNonNullValueListTest : public testing::Test,
}

entries.push_back(entry);
// The total size for serialization is
// (size of length + size of hash + actual value size) for each entry.
totalSize += entry.size + kSizeOfHash + kSizeOfLength;

ASSERT_TRUE(uniqueValues.insert(entry).second);
ASSERT_TRUE(uniqueValues.contains(entry));
Expand All @@ -65,7 +77,19 @@ class AddressableNonNullValueListTest : public testing::Test,
ASSERT_EQ(uniqueData->size(), values.size());
ASSERT_EQ(uniqueData->size(), uniqueValues.size());

auto copy = BaseVector::create(data->type(), uniqueData->size(), pool());
testDirectRead(entries, uniqueValues, uniqueData);
testSerialization(entries, totalSize, uniqueData);
}

// Test direct read from AddressableNonNullValueList.
// Reads AddressableNonNullValueList into a vector, and validates its
// content.
void testDirectRead(
const std::vector<T>& entries,
const Set& uniqueValues,
const VectorPtr& uniqueData) {
auto copy =
BaseVector::create(uniqueData->type(), uniqueData->size(), pool());
for (auto i = 0; i < entries.size(); ++i) {
auto entry = entries[i];
ASSERT_TRUE(uniqueValues.contains(entry));
Expand All @@ -75,6 +99,57 @@ class AddressableNonNullValueListTest : public testing::Test,
test::assertEqualVectors(uniqueData, copy);
}

// Test copy/appendSerialized round-trip for AddressableNonNullValueList.
// Steps in the test:
// i) Copy entry length, hash and value of each entry to a stream.
// ii) Deserialize stream to a new set of entries.
// iii) Read deserialized entries back into a vector.
// iv) Validate the result vector.
void testSerialization(
const std::vector<T>& entries,
vector_size_t totalSize,
const VectorPtr& uniqueData) {
size_t offset = 0;
auto buffer = AlignedBuffer::allocate<char>(totalSize, pool());
auto* rawBuffer = buffer->as<char>();

auto append = [&](const void* value, size_t size) {
memcpy((void*)(rawBuffer + offset), value, size);
offset += size;
};

for (const auto& entry : entries) {
append(&entry.size, kSizeOfLength);
append(&entry.hash, kSizeOfHash);
AddressableNonNullValueList::readSerialized(
entry, (char*)(rawBuffer + offset));
offset += entry.size;
}
ASSERT_EQ(offset, totalSize);

// Deserialize entries from the stream.
AddressableNonNullValueList deserialized;
std::vector<T> deserializedEntries;
common::InputByteStream stream(rawBuffer);
while (stream.offset() < totalSize) {
auto length = stream.read<vector_size_t>();
auto hash = stream.read<uint64_t>();
auto entry = deserialized.appendSerialized(
StringView(stream.read<char>(length), length), allocator());
deserializedEntries.push_back({entry.first, entry.second, hash});
}

// Direct read from deserialized AddressableNonNullValueList. Validate the
// results.
auto deserializedCopy =
BaseVector::create(uniqueData->type(), uniqueData->size(), pool());
for (auto i = 0; i < deserializedEntries.size(); ++i) {
auto entry = deserializedEntries[i];
AddressableNonNullValueList::read(entry, *deserializedCopy, i);
}
test::assertEqualVectors(uniqueData, deserializedCopy);
}

HashStringAllocator* allocator() {
return allocator_.get();
}
Expand Down

0 comments on commit 6705014

Please sign in to comment.