From c8b0c0a00a9eb7af404ddadad8642871369abc74 Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Fri, 2 Feb 2024 11:25:19 -0800 Subject: [PATCH] Add SetAccumulator serialize/deserialize logic --- velox/exec/SetAccumulator.h | 262 ++++++++++++++++++++-- velox/exec/tests/CMakeLists.txt | 1 + velox/exec/tests/SetAccumulatorTest.cpp | 286 ++++++++++++++++++++++++ 3 files changed, 532 insertions(+), 17 deletions(-) create mode 100644 velox/exec/tests/SetAccumulatorTest.cpp diff --git a/velox/exec/SetAccumulator.h b/velox/exec/SetAccumulator.h index 14814d6385b8..7705e0c4dbdb 100644 --- a/velox/exec/SetAccumulator.h +++ b/velox/exec/SetAccumulator.h @@ -16,6 +16,8 @@ #pragma once #include + +#include "velox/common/base/IOUtils.h" #include "velox/common/memory/HashStringAllocator.h" #include "velox/exec/AddressableNonNullValueList.h" #include "velox/exec/Strings.h" @@ -86,6 +88,37 @@ struct SetAccumulator { } } + void readNullIndex(common::InputByteStream& stream) { + VELOX_CHECK(!nullIndex.has_value()); + auto streamNullIndex = stream.read(); + if (streamNullIndex != kNoNullIndex) { + nullIndex = streamNullIndex; + } + } + + inline bool isNullIndex(size_t i) { + return nullIndex.has_value() && i == nullIndex.value(); + } + + /// Deserializes accumulator from previously serialized value. + void deserialize( + const StringView& serialized, + HashStringAllocator* /*allocator*/) { + // The serialized value is the nullOffset (kNoNullIndex if no null is + // present) followed by the unique values ordered by index. + common::InputByteStream stream(serialized.data()); + readNullIndex(stream); + + size_t i = 0; + auto size = serialized.size(); + while (stream.offset() < size) { + if (!isNullIndex(i)) { + uniqueValues.insert({stream.read(), i}); + } + i++; + } + } + /// Returns number of unique values including null. size_t size() const { return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0); @@ -106,10 +139,48 @@ struct SetAccumulator { : uniqueValues.size(); } + void serializeNullIndex(char* buffer) { + auto nullIndexValue = + nullIndex.has_value() ? nullIndex.value() : kNoNullIndex; + memcpy(buffer, &nullIndexValue, kVectorSizeT); + } + + /// Extracts in result[index] a serialized VARBINARY for the Set Values. + /// This is used for the spill of this accumulator. + void serialize(const VectorPtr& result, vector_size_t index) { + // The serialized value is the nullOffset (kNoNullIndex if no null is + // present) followed by the unique values in order of their indices. + // The null position is skipped when serializing the values. + size_t totalBytes = kVectorSizeT + kValueSizeT * uniqueValues.size(); + + auto* flatResult = result->as>(); + auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalBytes, true); + + serializeNullIndex(rawBuffer); + + auto nullPosition = nullIndex.has_value() ? nullIndex : uniqueValues.size(); + // To serialize values, we first compute its offset in the serialized buffer + // from the value's position tracked in the SetAccumulator. + auto offset = [&](vector_size_t index) { + // The null position is skipped when computing the buffer offset. + return kVectorSizeT + + (index < nullPosition ? index : index - 1) * kValueSizeT; + }; + for (auto value : uniqueValues) { + memcpy(rawBuffer + offset(value.second), &(value.first), kValueSizeT); + } + + flatResult->setNoCopy(index, StringView(rawBuffer, totalBytes)); + } + void free(HashStringAllocator& allocator) { using UT = decltype(uniqueValues); uniqueValues.~UT(); } + + static const vector_size_t kNoNullIndex = -1; + static constexpr size_t kVectorSizeT = sizeof(vector_size_t); + static constexpr size_t kValueSizeT = sizeof(T); }; /// Maintains a set of unique strings. @@ -120,6 +191,19 @@ struct StringViewSetAccumulator { /// Stores unique non-null non-inline strings. Strings strings; + /// Size (in bytes) of the serialized string values (this includes inline and + /// non-inline) strings. This value also includes the bytes for serializing + /// the length value (base.kVectorSizeT) of the strings. + /// Used for computing serialized buffer size for spilling. + size_t stringSetBytes = base.kVectorSizeT; + + /// When serializing the strings for spilling, they are written in order of + /// their indexes. 'offsets' represents the offset of the unique value at that + /// index from the beginning of the serialization buffer. These offsets are + /// maintained to easily copy the unique value at that position in the + /// serialization buffer. + std::vector offsets; + StringViewSetAccumulator(const TypePtr& type, HashStringAllocator* allocator) : base{type, allocator} {} @@ -131,17 +215,14 @@ struct StringViewSetAccumulator { if (decoded.isNullAt(index)) { if (!base.nullIndex.has_value()) { base.nullIndex = cnt; + // nullIndex is never encountered in uniqueValues. But we add an entry + // in the offsets vector to maintain a direct mapping between the + // index and its position in offsets array. + offsets.push_back(stringSetBytes); } } else { auto value = decoded.valueAt(index); - if (!value.isInline()) { - if (base.uniqueValues.contains(value)) { - return; - } - value = strings.append(value, *allocator); - } - base.uniqueValues.insert( - {value, base.nullIndex.has_value() ? cnt + 1 : cnt}); + addValue(value, base.nullIndex.has_value() ? cnt + 1 : cnt, allocator); } } @@ -158,6 +239,29 @@ struct StringViewSetAccumulator { } } + void deserialize( + const StringView& serialized, + HashStringAllocator* allocator) { + common::InputByteStream stream(serialized.data()); + auto size = serialized.size(); + + // The serialized string comprises of nullIndex (or kNoNullIndex) + // followed by pairs of (length, String value) of the unique + // values. The unique values are serialized in increasing order of their + // indexes. + base.readNullIndex(stream); + + vector_size_t length; + vector_size_t i = 0; + while (stream.offset() < size) { + if (!base.isNullIndex(i)) { + length = stream.read(); + addValue(StringView(stream.read(length), length), i, allocator); + } + i++; + } + } + size_t size() const { return base.size(); } @@ -168,11 +272,56 @@ struct StringViewSetAccumulator { return base.extractValues(values, offset); } + /// Extracts in result[index] a serialized VARBINARY for the String Values. + /// This is used for the spill of this accumulator. + void serialize(const VectorPtr& result, vector_size_t index) { + // nullIndex (or kNoNullIndex) is serialized followed by pairs of + // (length, String value) of the unique values in the order of their + // indices. + auto* flatResult = result->as>(); + auto* rawBuffer = + flatResult->getRawStringBufferWithSpace(stringSetBytes, true); + base.serializeNullIndex(rawBuffer); + + vector_size_t length; + char* position; + // Copy the length and string value at the position from the offsets + // array. offsets accounts for skipping null index. + for (const auto& value : base.uniqueValues) { + position = rawBuffer + offsets[value.second]; + length = value.first.size(); + memcpy(position, &length, base.kVectorSizeT); + memcpy(position + base.kVectorSizeT, value.first.data(), length); + } + + flatResult->setNoCopy(index, StringView(rawBuffer, stringSetBytes)); + } + void free(HashStringAllocator& allocator) { strings.free(allocator); using Base = decltype(base); base.~Base(); } + + private: + void addValue( + const StringView& value, + vector_size_t index, + HashStringAllocator* allocator) { + if (base.uniqueValues.contains(value)) { + return; + } + StringView valueCopy = value; + if (!valueCopy.isInline()) { + valueCopy = strings.append(value, *allocator); + } + + base.uniqueValues.insert({valueCopy, index}); + // The new position is written at the end of the serialization buffer. + offsets.push_back(stringSetBytes); + // Accounts for serializing the length of the string as well. + stringSetBytes += base.kVectorSizeT + valueCopy.size(); + } }; /// Maintains a set of unique arrays, maps or structs. @@ -187,29 +336,54 @@ struct ComplexTypeSetAccumulator { /// Stores unique non-null values. AddressableNonNullValueList values; + /// Tracks allocated bytes for sizing during serialization for spill. + /// Initialized to account for the serialization of the null index. + size_t totalSize = base.kVectorSizeT; + + /// When serializing the values for spilling, they are written in order of + /// their indexes. 'offsets' represents the offset of the unique value at that + /// index from the beginning of the serialization buffer. These offsets are + /// maintained to easily copy the unique value at that position in the + /// serialization buffer. + std::vector offsets; + + static constexpr size_t kHashSizeT = sizeof(uint64_t); + ComplexTypeSetAccumulator(const TypePtr& type, HashStringAllocator* allocator) : base{ AddressableNonNullValueList::Hash{}, AddressableNonNullValueList::EqualTo{type}, allocator} {} + void addEntry( + const AddressableNonNullValueList::Entry& entry, + vector_size_t index) { + if (!base.uniqueValues.insert({entry, index}).second) { + values.removeLast(entry); + } else { + offsets.push_back(totalSize); + // Accounts for the length of the complex type along with its size and + // hash. + totalSize += base.kVectorSizeT + kHashSizeT + entry.size; + } + } + void addValue( const DecodedVector& decoded, - vector_size_t index, + vector_size_t i, HashStringAllocator* allocator) { const auto cnt = base.uniqueValues.size(); - if (decoded.isNullAt(index)) { + if (decoded.isNullAt(i)) { if (!base.nullIndex.has_value()) { base.nullIndex = cnt; + // Adding an entry in the offsets array so that we can maintain + // a direct mapping of index in the offsets array. + offsets.push_back(totalSize); } } else { - auto entry = values.append(decoded, index, allocator); - - if (!base.uniqueValues - .insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt}) - .second) { - values.removeLast(entry); - } + auto entry = values.append(decoded, i, allocator); + auto index = base.nullIndex.has_value() ? cnt + 1 : cnt; + addEntry(entry, index); } } @@ -226,6 +400,33 @@ struct ComplexTypeSetAccumulator { } } + void deserialize( + const StringView& serialized, + HashStringAllocator* allocator) { + auto stream = common::InputByteStream(serialized.data()); + + // The serialized string contains the null index followed by pairs of + // (value size, ComplexType value) of all unique values of the + // accumulator. The values are in the order of increasing indices. + base.readNullIndex(stream); + + vector_size_t length; + vector_size_t i = 0; + uint64_t hash; + auto size = serialized.size(); + while (stream.offset() < size) { + if (!base.isNullIndex(i)) { + length = stream.read(); + hash = stream.read(); + + auto result = values.appendSerialized( + StringView(stream.read(length), length), hash, allocator); + addEntry(result, i); + } + i++; + } + } + size_t size() const { return base.size(); } @@ -243,6 +444,33 @@ struct ComplexTypeSetAccumulator { return base.uniqueValues.size() + (base.nullIndex.has_value() ? 1 : 0); } + /// Extracts in result[index] a serialized VARBINARY for the String Values. + /// This is used for the spill of this accumulator. + void serialize(const VectorPtr& result, vector_size_t index) { + // nullIndex is serialized followed by pairs of (value size, value hash, + // ComplexType value) of all unique values. The unique values are serialized + // in order of their indices. + + auto* flatResult = result->as>(); + auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalSize, true); + base.serializeNullIndex(rawBuffer); + + size_t offset; + for (const auto& value : base.uniqueValues) { + offset = offsets.at(value.second); + + memcpy(rawBuffer + offset, &value.first.size, base.kVectorSizeT); + offset += base.kVectorSizeT; + + memcpy(rawBuffer + offset, &value.first.hash, kHashSizeT); + offset += kHashSizeT; + + AddressableNonNullValueList::copy(value.first, rawBuffer + offset); + } + + flatResult->setNoCopy(index, StringView(rawBuffer, totalSize)); + } + void free(HashStringAllocator& allocator) { values.free(allocator); using Base = decltype(base); diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index baf67dd3ea1d..d1bb226762aa 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -64,6 +64,7 @@ add_executable( RowContainerTest.cpp RowNumberTest.cpp MarkDistinctTest.cpp + SetAccumulatorTest.cpp SharedArbitratorTest.cpp SpillTest.cpp SpillOperatorGroupTest.cpp diff --git a/velox/exec/tests/SetAccumulatorTest.cpp b/velox/exec/tests/SetAccumulatorTest.cpp new file mode 100644 index 000000000000..a15d486903f1 --- /dev/null +++ b/velox/exec/tests/SetAccumulatorTest.cpp @@ -0,0 +1,286 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/SetAccumulator.h" + +#include +#include "velox/exec/AddressableNonNullValueList.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::aggregate::prestosql { + +namespace { + +// The tests in this class validate the following +// (for both Primitive and Complex types) : +// i) Builds a SetAccumulator from the input data. +// ii) Tracks the unique values in the input data for validation. +// iii) Serializes the SetAccumulator and de-serializes the result in a second +// accumulator. +// The test validates that both accumulators have the same contents and the +// contents of the deserialized accumulator comprise the unique values from +// the input data. +class SetAccumulatorTest : public testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + template + void testPrimitive(const VectorPtr& data) { + std::unordered_set uniqueValues; + SetAccumulator accumulator(data->type(), allocator()); + DecodedVector decodedVector(*data); + for (auto i = 0; i < data->size(); ++i) { + accumulator.addValue(decodedVector, i, allocator()); + uniqueValues.insert(decodedVector.valueAt(i)); + } + ASSERT_EQ(accumulator.size(), uniqueValues.size()); + + auto serialized = BaseVector::create(VARBINARY(), 1, pool()); + accumulator.serialize(serialized, 0); + + // Initialize another accumulator from the serialized vector. + SetAccumulator accumulator2(data->type(), allocator()); + accumulator2.deserialize( + serialized->asFlatVector()->valueAt(0), allocator()); + + // Extract the contents of the accumulator. The contents should match + // all the uniqueValues. + auto copy = BaseVector::create(data->type(), accumulator2.size(), pool()); + auto copyFlat = copy->template asFlatVector(); + accumulator2.extractValues(*copyFlat, 0); + + ASSERT_EQ(copy->size(), uniqueValues.size()); + for (auto i = 0; i < copy->size(); i++) { + ASSERT_TRUE( + copyFlat->isNullAt(i) || + uniqueValues.count(copyFlat->valueAt(i)) != 0); + } + } + + void testComplexType(const VectorPtr& data) { + using T = AddressableNonNullValueList::Entry; + using Set = folly::F14FastSet< + T, + AddressableNonNullValueList::Hash, + AddressableNonNullValueList::EqualTo, + AlignedStlAllocator>; + + // Unique values set used for validation in the tests. + AddressableNonNullValueList values; + Set uniqueValues{ + 0, + AddressableNonNullValueList::Hash{}, + AddressableNonNullValueList::EqualTo{data->type()}, + AlignedStlAllocator(allocator())}; + + // Build an accumulator from the input data. Also create a set of the + // unique values for validation. + SetAccumulator accumulator1(data->type(), allocator()); + DecodedVector decodedVector(*data); + vector_size_t nullPosition = -1; + for (auto i = 0; i < data->size(); ++i) { + accumulator1.addValue(decodedVector, i, allocator()); + if (!decodedVector.isNullAt(i)) { + auto entry = values.append(decodedVector, i, allocator()); + if (uniqueValues.contains(entry)) { + values.removeLast(entry); + continue; + } + ASSERT_TRUE(uniqueValues.insert(entry).second); + ASSERT_TRUE(uniqueValues.contains(entry)); + ASSERT_FALSE(uniqueValues.insert(entry).second); + } else { + nullPosition = i; + } + } + + auto accumulatorSizeCheck = + [&](const SetAccumulator& accumulator) { + if (nullPosition != -1) { + ASSERT_EQ(accumulator.size(), uniqueValues.size() + 1); + } else { + ASSERT_EQ(accumulator.size(), uniqueValues.size()); + } + }; + accumulatorSizeCheck(accumulator1); + + // Serialize the accumulator. + auto serialized = BaseVector::create(VARBINARY(), 1, pool()); + accumulator1.serialize(serialized, 0); + + // Initialize another accumulator from the serialized vector. + SetAccumulator accumulator2(data->type(), allocator()); + accumulator2.deserialize( + serialized->asFlatVector()->valueAt(0), allocator()); + ASSERT_EQ(accumulator2.size(), accumulator1.size()); + accumulatorSizeCheck(accumulator2); + + // Extract the contents of the deserialized accumulator. + // All the values extracted are in the uniqueValues set already. + auto copy = BaseVector::create(data->type(), accumulator2.size(), pool()); + accumulator2.extractValues(*copy, 0); + DecodedVector copyDecoded(*copy); + for (auto i = 0; i < copy->size(); ++i) { + if (copyDecoded.isNullAt(i)) { + ASSERT_EQ(i, nullPosition); + } else { + auto position = values.append(copyDecoded, i, allocator()); + ASSERT_TRUE(uniqueValues.contains(position)); + values.removeLast(position); + } + } + } + + HashStringAllocator* allocator() { + return allocator_.get(); + } + + std::unique_ptr allocator_{ + std::make_unique(pool())}; +}; + +TEST_F(SetAccumulatorTest, integral) { + auto data1 = makeFlatVector({1, 2, 3, 4, 5}); + testPrimitive(data1); + auto data2 = makeFlatVector({1, 2, 2, 3, 3, 4, 5, 5}); + testPrimitive(data2); + auto data3 = makeFlatVector({1, 2, 2, 3, 4, 5, 3, 1, 4}); + testPrimitive(data3); + auto data4 = makeNullableFlatVector({std::nullopt, 1, 2}); + testPrimitive(data4); +} + +TEST_F(SetAccumulatorTest, date) { + auto data = makeFlatVector({1, 2, 3, 4, 5}, DATE()); + testPrimitive(data); + data = makeFlatVector({1, 2, 2, 3, 3, 4, 5, 5}, DATE()); + testPrimitive(data); + data = makeFlatVector({1, 2, 2, 3, 4, 5, 3, 1, 4}, DATE()); + testPrimitive(data); + data = makeNullableFlatVector({1, 2, std::nullopt}, DATE()); + testPrimitive(data); +} + +TEST_F(SetAccumulatorTest, strings) { + auto data = + makeFlatVector({"abc", "non-inline string", "1234!@#$"}); + testPrimitive(data); + + data = makeFlatVector( + {"abc", + "non-inline string", + "non-inline string", + "reallylongstringreallylongstringreallylongstring", + "1234!@#$", + "abc"}); + testPrimitive(data); + + data = makeNullableFlatVector({"abc", std::nullopt, "def"}); + testPrimitive(data); +} + +TEST_F(SetAccumulatorTest, array) { + auto data = makeArrayVector({ + {1, 2, 3}, + {4, 5}, + {6, 7, 8, 9}, + {}, + }); + testComplexType(data); + + data = makeNullableArrayVector({ + {1, 2, 3}, + {4, 5}, + {std::nullopt}, + {6, 7, 8, 9}, + {}, + }); + testComplexType(data); + + data = makeArrayVector({ + {1, 2, 3}, + {1, 2, 3}, + {4, 5}, + {6, 7, 8, 9}, + {}, + {4, 5}, + {1, 2, 3}, + {}, + }); + testComplexType(data); +} + +TEST_F(SetAccumulatorTest, map) { + auto data = makeMapVector({ + {{1, 10.1213}, {2, 20}}, + {{3, 30}, {4, 40.258703570235497205}, {5, 50}}, + {{1, 10.4324}, {3, 30}, {4, 40.45209809}, {6, 60}}, + {}, + }); + testComplexType(data); + + data = makeNullableMapVector({ + {{{1, "abc"}, {2, "this is a non-inline string"}}}, + std::nullopt, + {{{3, "qrs"}, {4, "m"}, {5, "%&^%&^af489372843"}}}, + {{}}, + }); + testComplexType(data); + + // Has non-unique rows. + data = makeMapVector({ + {{1, 10}, {2, 20}}, + {{3, 30}, {4, 40}, {5, 50}}, + {{3, 30}, {4, 40}, {5, 50}}, + {{1, 10}, {2, 20}}, + {{1, 10}, {3, 30}, {4, 40}, {6, 60}}, + {}, + {{1, 10}, {2, 20}}, + {}, + {{3, 30}, {4, 40}, {5, 50}}, + }); + testComplexType(data); +} + +TEST_F(SetAccumulatorTest, row) { + auto data = makeRowVector({ + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector( + {"abc", "this is a non-inline string", "def", "ghij", "klm"}), + makeFlatVector({11, 22, 33, 44, 55}), + }); + testComplexType(data); + + // Has non-unique rows. + data = makeRowVector({ + makeFlatVector({1, 2, 3, 4, 2, 5, 3}), + makeFlatVector( + {10.1, 20.1234567, 30.35, 40, 20.1234567, 50.42309234, 30}), + makeFlatVector({11, 22, 33, 44, 22, 55, 33}, DATE()), + }); + testComplexType(data); + + data = makeRowVector({ + makeNullableFlatVector({1, 2, std::nullopt, 4, 5}), + makeNullableFlatVector({10, 20, 30, std::nullopt, 50}), + makeNullableFlatVector({std::nullopt, 22, 33, std::nullopt, 55}), + }); + testComplexType(data); +} + +} // namespace +} // namespace facebook::velox::aggregate::prestosql