From 7cf983df9879495cf4d024a93e33f6fb54121d2c Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Wed, 7 Feb 2024 10:26:36 -0800 Subject: [PATCH] Add new APIs to AddressableNonNullValueList to copy/append a stream of bytes (#8653) Summary: This is the first in a set of PRs to add support for spilling distinct aggregations (see full version in https://github.com/facebookincubator/velox/pull/7791). Spilling distinct aggregations needs support to spill SetAccumulators in which input values are cumulated. ComplexTypeSetAccumulators use AddressableNonNullValueList to serialize complex types. This PR adds new APIs to AddressableNonNullValueList so that it can copy/append a stream of bytes corresponding to a ComplexType value (array, map, struct). Pull Request resolved: https://github.com/facebookincubator/velox/pull/8653 Reviewed By: Yuhta Differential Revision: D53497000 Pulled By: mbasmanova fbshipit-source-id: 66d44d02a2c3bd5775725c8b8559feaed17c0813 --- velox/common/memory/ByteStream.h | 4 + velox/exec/AddressableNonNullValueList.cpp | 38 ++++++-- velox/exec/AddressableNonNullValueList.h | 12 +++ .../tests/AddressableNonNullValueListTest.cpp | 90 +++++++++++++++++-- 4 files changed, 131 insertions(+), 13 deletions(-) diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 0c623dfd29ee..677d6659df51 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -227,6 +227,10 @@ class ByteOutputStream { void operator=(const ByteOutputStream& other) = delete; + // Forcing a move constructor to be able to return ByteOutputStream objects + // from a function. + ByteOutputStream(ByteOutputStream&&) = default; + /// Sets 'this' to range over 'range'. If this is for purposes of writing, /// lastWrittenPosition specifies the end of any pre-existing content in /// 'range'. diff --git a/velox/exec/AddressableNonNullValueList.cpp b/velox/exec/AddressableNonNullValueList.cpp index dc0bc686c29b..4f440dc4422c 100644 --- a/velox/exec/AddressableNonNullValueList.cpp +++ b/velox/exec/AddressableNonNullValueList.cpp @@ -18,9 +18,7 @@ namespace facebook::velox::aggregate::prestosql { -AddressableNonNullValueList::Entry AddressableNonNullValueList::append( - const DecodedVector& decoded, - vector_size_t index, +ByteOutputStream AddressableNonNullValueList::initStream( HashStringAllocator* allocator) { ByteOutputStream stream(allocator); if (!firstHeader_) { @@ -30,13 +28,21 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( // and a next pointer. This could be adaptive, with smaller initial // sizes for lots of small arrays. static constexpr int kInitialSize = 44; - currentPosition_ = allocator->newWrite(stream, kInitialSize); firstHeader_ = currentPosition_.header; } else { allocator->extendWrite(currentPosition_, stream); } + return stream; +} + +AddressableNonNullValueList::Entry AddressableNonNullValueList::append( + const DecodedVector& decoded, + vector_size_t index, + HashStringAllocator* allocator) { + auto stream = initStream(allocator); + const auto hash = decoded.base()->hashValueAt(decoded.index(index)); const auto originalSize = stream.size(); @@ -44,7 +50,6 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( // Write value. exec::ContainerRowSerde::serialize( *decoded.base(), decoded.index(index), stream); - ++size_; auto startAndFinish = allocator->finishWrite(stream, 1024); @@ -55,6 +60,21 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( return {startAndFinish.first, writtenSize, hash}; } +HashStringAllocator::Position AddressableNonNullValueList::appendSerialized( + const StringView& value, + HashStringAllocator* allocator) { + auto stream = initStream(allocator); + + const auto originalSize = stream.size(); + stream.appendStringView(value); + ++size_; + + auto startAndFinish = allocator->finishWrite(stream, 1024); + currentPosition_ = startAndFinish.second; + VELOX_CHECK_EQ(stream.size() - originalSize, value.size()); + return {startAndFinish.first}; +} + namespace { ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) { @@ -94,4 +114,12 @@ void AddressableNonNullValueList::read( exec::ContainerRowSerde::deserialize(stream, index, &result); } +// static +void AddressableNonNullValueList::readSerialized( + const Entry& position, + char* dest) { + auto stream = prepareRead(position); + stream.readBytes(dest, position.size); +} + } // namespace facebook::velox::aggregate::prestosql diff --git a/velox/exec/AddressableNonNullValueList.h b/velox/exec/AddressableNonNullValueList.h index cd142385e559..81fd6a66aab9 100644 --- a/velox/exec/AddressableNonNullValueList.h +++ b/velox/exec/AddressableNonNullValueList.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/IOUtils.h" #include "velox/common/memory/HashStringAllocator.h" #include "velox/vector/DecodedVector.h" @@ -57,6 +58,12 @@ class AddressableNonNullValueList { vector_size_t index, HashStringAllocator* allocator); + /// Append a non-null serialized value to the end of the list. + /// Returns position that can be used to access the value later. + HashStringAllocator::Position appendSerialized( + const StringView& value, + HashStringAllocator* allocator); + /// Removes last element. 'position' must be a value returned from the latest /// call to 'append'. void removeLast(const Entry& entry) { @@ -77,6 +84,9 @@ class AddressableNonNullValueList { static void read(const Entry& position, BaseVector& result, vector_size_t index); + /// Copies to 'dest' entry.size bytes at position. + static void readSerialized(const Entry& position, char* dest); + void free(HashStringAllocator& allocator) { if (size_ > 0) { allocator.free(firstHeader_); @@ -84,6 +94,8 @@ class AddressableNonNullValueList { } private: + ByteOutputStream initStream(HashStringAllocator* allocator); + // Memory allocation (potentially multi-part). HashStringAllocator::Header* firstHeader_{nullptr}; HashStringAllocator::Position currentPosition_{nullptr, nullptr}; diff --git a/velox/exec/tests/AddressableNonNullValueListTest.cpp b/velox/exec/tests/AddressableNonNullValueListTest.cpp index 546d3fb47476..bb4983ea0308 100644 --- a/velox/exec/tests/AddressableNonNullValueListTest.cpp +++ b/velox/exec/tests/AddressableNonNullValueListTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/AddressableNonNullValueList.h" #include +#include "velox/common/base/IOUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" namespace facebook::velox::aggregate::prestosql { @@ -28,14 +29,17 @@ class AddressableNonNullValueListTest : public testing::Test, memory::MemoryManager::testingSetInstance({}); } - void test(const VectorPtr& data, const VectorPtr& uniqueData) { - using T = AddressableNonNullValueList::Entry; - using Set = folly::F14FastSet< - T, - AddressableNonNullValueList::Hash, - AddressableNonNullValueList::EqualTo, - AlignedStlAllocator>; + using T = AddressableNonNullValueList::Entry; + using Set = folly::F14FastSet< + T, + AddressableNonNullValueList::Hash, + AddressableNonNullValueList::EqualTo, + AlignedStlAllocator>; + + static constexpr size_t kSizeOfHash = sizeof(uint64_t); + static constexpr size_t kSizeOfLength = sizeof(vector_size_t); + void test(const VectorPtr& data, const VectorPtr& uniqueData) { Set uniqueValues{ 0, AddressableNonNullValueList::Hash{}, @@ -46,6 +50,10 @@ class AddressableNonNullValueListTest : public testing::Test, std::vector entries; + // Tracks the number of bytes for serializing the + // AddressableNonNullValueList. + vector_size_t totalSize = 0; + DecodedVector decodedVector(*data); for (auto i = 0; i < data->size(); ++i) { auto entry = values.append(decodedVector, i, allocator()); @@ -56,6 +64,9 @@ class AddressableNonNullValueListTest : public testing::Test, } entries.push_back(entry); + // The total size for serialization is + // (size of length + size of hash + actual value size) for each entry. + totalSize += entry.size + kSizeOfHash + kSizeOfLength; ASSERT_TRUE(uniqueValues.insert(entry).second); ASSERT_TRUE(uniqueValues.contains(entry)); @@ -65,7 +76,19 @@ class AddressableNonNullValueListTest : public testing::Test, ASSERT_EQ(uniqueData->size(), values.size()); ASSERT_EQ(uniqueData->size(), uniqueValues.size()); - auto copy = BaseVector::create(data->type(), uniqueData->size(), pool()); + testDirectRead(entries, uniqueValues, uniqueData); + testSerialization(entries, totalSize, uniqueData); + } + + // Test direct read from AddressableNonNullValueList. + // Reads AddressableNonNullValueList into a vector, and validates its + // content. + void testDirectRead( + const std::vector& entries, + const Set& uniqueValues, + const VectorPtr& uniqueData) { + auto copy = + BaseVector::create(uniqueData->type(), uniqueData->size(), pool()); for (auto i = 0; i < entries.size(); ++i) { auto entry = entries[i]; ASSERT_TRUE(uniqueValues.contains(entry)); @@ -75,6 +98,57 @@ class AddressableNonNullValueListTest : public testing::Test, test::assertEqualVectors(uniqueData, copy); } + // Test copy/appendSerialized round-trip for AddressableNonNullValueList. + // Steps in the test: + // i) Copy entry length, hash and value of each entry to a stream. + // ii) Deserialize stream to a new set of entries. + // iii) Read deserialized entries back into a vector. + // iv) Validate the result vector. + void testSerialization( + const std::vector& entries, + vector_size_t totalSize, + const VectorPtr& uniqueData) { + size_t offset = 0; + auto buffer = AlignedBuffer::allocate(totalSize, pool()); + auto* rawBuffer = buffer->asMutable(); + + auto append = [&](const void* value, size_t size) { + memcpy((void*)(rawBuffer + offset), value, size); + offset += size; + }; + + for (const auto& entry : entries) { + append(&entry.size, kSizeOfLength); + append(&entry.hash, kSizeOfHash); + AddressableNonNullValueList::readSerialized( + entry, (char*)(rawBuffer + offset)); + offset += entry.size; + } + ASSERT_EQ(offset, totalSize); + + // Deserialize entries from the stream. + AddressableNonNullValueList deserialized; + std::vector deserializedEntries; + common::InputByteStream stream(rawBuffer); + while (stream.offset() < totalSize) { + auto length = stream.read(); + auto hash = stream.read(); + StringView contents(stream.read(length), length); + auto position = deserialized.appendSerialized(contents, allocator()); + deserializedEntries.push_back({position, contents.size(), hash}); + } + + // Direct read from deserialized AddressableNonNullValueList. Validate the + // results. + auto deserializedCopy = + BaseVector::create(uniqueData->type(), uniqueData->size(), pool()); + for (auto i = 0; i < deserializedEntries.size(); ++i) { + auto entry = deserializedEntries[i]; + AddressableNonNullValueList::read(entry, *deserializedCopy, i); + } + test::assertEqualVectors(uniqueData, deserializedCopy); + } + HashStringAllocator* allocator() { return allocator_.get(); }