diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 0c623dfd29ee..677d6659df51 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -227,6 +227,10 @@ class ByteOutputStream { void operator=(const ByteOutputStream& other) = delete; + // Forcing a move constructor to be able to return ByteOutputStream objects + // from a function. + ByteOutputStream(ByteOutputStream&&) = default; + /// Sets 'this' to range over 'range'. If this is for purposes of writing, /// lastWrittenPosition specifies the end of any pre-existing content in /// 'range'. diff --git a/velox/exec/AddressableNonNullValueList.cpp b/velox/exec/AddressableNonNullValueList.cpp index dc0bc686c29b..4f440dc4422c 100644 --- a/velox/exec/AddressableNonNullValueList.cpp +++ b/velox/exec/AddressableNonNullValueList.cpp @@ -18,9 +18,7 @@ namespace facebook::velox::aggregate::prestosql { -AddressableNonNullValueList::Entry AddressableNonNullValueList::append( - const DecodedVector& decoded, - vector_size_t index, +ByteOutputStream AddressableNonNullValueList::initStream( HashStringAllocator* allocator) { ByteOutputStream stream(allocator); if (!firstHeader_) { @@ -30,13 +28,21 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( // and a next pointer. This could be adaptive, with smaller initial // sizes for lots of small arrays. static constexpr int kInitialSize = 44; - currentPosition_ = allocator->newWrite(stream, kInitialSize); firstHeader_ = currentPosition_.header; } else { allocator->extendWrite(currentPosition_, stream); } + return stream; +} + +AddressableNonNullValueList::Entry AddressableNonNullValueList::append( + const DecodedVector& decoded, + vector_size_t index, + HashStringAllocator* allocator) { + auto stream = initStream(allocator); + const auto hash = decoded.base()->hashValueAt(decoded.index(index)); const auto originalSize = stream.size(); @@ -44,7 +50,6 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( // Write value. exec::ContainerRowSerde::serialize( *decoded.base(), decoded.index(index), stream); - ++size_; auto startAndFinish = allocator->finishWrite(stream, 1024); @@ -55,6 +60,21 @@ AddressableNonNullValueList::Entry AddressableNonNullValueList::append( return {startAndFinish.first, writtenSize, hash}; } +HashStringAllocator::Position AddressableNonNullValueList::appendSerialized( + const StringView& value, + HashStringAllocator* allocator) { + auto stream = initStream(allocator); + + const auto originalSize = stream.size(); + stream.appendStringView(value); + ++size_; + + auto startAndFinish = allocator->finishWrite(stream, 1024); + currentPosition_ = startAndFinish.second; + VELOX_CHECK_EQ(stream.size() - originalSize, value.size()); + return {startAndFinish.first}; +} + namespace { ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) { @@ -94,4 +114,12 @@ void AddressableNonNullValueList::read( exec::ContainerRowSerde::deserialize(stream, index, &result); } +// static +void AddressableNonNullValueList::readSerialized( + const Entry& position, + char* dest) { + auto stream = prepareRead(position); + stream.readBytes(dest, position.size); +} + } // namespace facebook::velox::aggregate::prestosql diff --git a/velox/exec/AddressableNonNullValueList.h b/velox/exec/AddressableNonNullValueList.h index cd142385e559..81fd6a66aab9 100644 --- a/velox/exec/AddressableNonNullValueList.h +++ b/velox/exec/AddressableNonNullValueList.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/IOUtils.h" #include "velox/common/memory/HashStringAllocator.h" #include "velox/vector/DecodedVector.h" @@ -57,6 +58,12 @@ class AddressableNonNullValueList { vector_size_t index, HashStringAllocator* allocator); + /// Append a non-null serialized value to the end of the list. + /// Returns position that can be used to access the value later. + HashStringAllocator::Position appendSerialized( + const StringView& value, + HashStringAllocator* allocator); + /// Removes last element. 'position' must be a value returned from the latest /// call to 'append'. void removeLast(const Entry& entry) { @@ -77,6 +84,9 @@ class AddressableNonNullValueList { static void read(const Entry& position, BaseVector& result, vector_size_t index); + /// Copies to 'dest' entry.size bytes at position. + static void readSerialized(const Entry& position, char* dest); + void free(HashStringAllocator& allocator) { if (size_ > 0) { allocator.free(firstHeader_); @@ -84,6 +94,8 @@ class AddressableNonNullValueList { } private: + ByteOutputStream initStream(HashStringAllocator* allocator); + // Memory allocation (potentially multi-part). HashStringAllocator::Header* firstHeader_{nullptr}; HashStringAllocator::Position currentPosition_{nullptr, nullptr}; diff --git a/velox/exec/tests/AddressableNonNullValueListTest.cpp b/velox/exec/tests/AddressableNonNullValueListTest.cpp index 546d3fb47476..bb4983ea0308 100644 --- a/velox/exec/tests/AddressableNonNullValueListTest.cpp +++ b/velox/exec/tests/AddressableNonNullValueListTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/AddressableNonNullValueList.h" #include +#include "velox/common/base/IOUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" namespace facebook::velox::aggregate::prestosql { @@ -28,14 +29,17 @@ class AddressableNonNullValueListTest : public testing::Test, memory::MemoryManager::testingSetInstance({}); } - void test(const VectorPtr& data, const VectorPtr& uniqueData) { - using T = AddressableNonNullValueList::Entry; - using Set = folly::F14FastSet< - T, - AddressableNonNullValueList::Hash, - AddressableNonNullValueList::EqualTo, - AlignedStlAllocator>; + using T = AddressableNonNullValueList::Entry; + using Set = folly::F14FastSet< + T, + AddressableNonNullValueList::Hash, + AddressableNonNullValueList::EqualTo, + AlignedStlAllocator>; + + static constexpr size_t kSizeOfHash = sizeof(uint64_t); + static constexpr size_t kSizeOfLength = sizeof(vector_size_t); + void test(const VectorPtr& data, const VectorPtr& uniqueData) { Set uniqueValues{ 0, AddressableNonNullValueList::Hash{}, @@ -46,6 +50,10 @@ class AddressableNonNullValueListTest : public testing::Test, std::vector entries; + // Tracks the number of bytes for serializing the + // AddressableNonNullValueList. + vector_size_t totalSize = 0; + DecodedVector decodedVector(*data); for (auto i = 0; i < data->size(); ++i) { auto entry = values.append(decodedVector, i, allocator()); @@ -56,6 +64,9 @@ class AddressableNonNullValueListTest : public testing::Test, } entries.push_back(entry); + // The total size for serialization is + // (size of length + size of hash + actual value size) for each entry. + totalSize += entry.size + kSizeOfHash + kSizeOfLength; ASSERT_TRUE(uniqueValues.insert(entry).second); ASSERT_TRUE(uniqueValues.contains(entry)); @@ -65,7 +76,19 @@ class AddressableNonNullValueListTest : public testing::Test, ASSERT_EQ(uniqueData->size(), values.size()); ASSERT_EQ(uniqueData->size(), uniqueValues.size()); - auto copy = BaseVector::create(data->type(), uniqueData->size(), pool()); + testDirectRead(entries, uniqueValues, uniqueData); + testSerialization(entries, totalSize, uniqueData); + } + + // Test direct read from AddressableNonNullValueList. + // Reads AddressableNonNullValueList into a vector, and validates its + // content. + void testDirectRead( + const std::vector& entries, + const Set& uniqueValues, + const VectorPtr& uniqueData) { + auto copy = + BaseVector::create(uniqueData->type(), uniqueData->size(), pool()); for (auto i = 0; i < entries.size(); ++i) { auto entry = entries[i]; ASSERT_TRUE(uniqueValues.contains(entry)); @@ -75,6 +98,57 @@ class AddressableNonNullValueListTest : public testing::Test, test::assertEqualVectors(uniqueData, copy); } + // Test copy/appendSerialized round-trip for AddressableNonNullValueList. + // Steps in the test: + // i) Copy entry length, hash and value of each entry to a stream. + // ii) Deserialize stream to a new set of entries. + // iii) Read deserialized entries back into a vector. + // iv) Validate the result vector. + void testSerialization( + const std::vector& entries, + vector_size_t totalSize, + const VectorPtr& uniqueData) { + size_t offset = 0; + auto buffer = AlignedBuffer::allocate(totalSize, pool()); + auto* rawBuffer = buffer->asMutable(); + + auto append = [&](const void* value, size_t size) { + memcpy((void*)(rawBuffer + offset), value, size); + offset += size; + }; + + for (const auto& entry : entries) { + append(&entry.size, kSizeOfLength); + append(&entry.hash, kSizeOfHash); + AddressableNonNullValueList::readSerialized( + entry, (char*)(rawBuffer + offset)); + offset += entry.size; + } + ASSERT_EQ(offset, totalSize); + + // Deserialize entries from the stream. + AddressableNonNullValueList deserialized; + std::vector deserializedEntries; + common::InputByteStream stream(rawBuffer); + while (stream.offset() < totalSize) { + auto length = stream.read(); + auto hash = stream.read(); + StringView contents(stream.read(length), length); + auto position = deserialized.appendSerialized(contents, allocator()); + deserializedEntries.push_back({position, contents.size(), hash}); + } + + // Direct read from deserialized AddressableNonNullValueList. Validate the + // results. + auto deserializedCopy = + BaseVector::create(uniqueData->type(), uniqueData->size(), pool()); + for (auto i = 0; i < deserializedEntries.size(); ++i) { + auto entry = deserializedEntries[i]; + AddressableNonNullValueList::read(entry, *deserializedCopy, i); + } + test::assertEqualVectors(uniqueData, deserializedCopy); + } + HashStringAllocator* allocator() { return allocator_.get(); }