From 80883f70ffacb28077839770c1b7b8a96720373a Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Tue, 27 Feb 2024 23:16:47 -0800 Subject: [PATCH] Add support to serialize/deserialize SetAccumulators --- velox/exec/SetAccumulator.h | 255 ++++++++++++++++++-- velox/exec/tests/CMakeLists.txt | 1 + velox/exec/tests/SetAccumulatorTest.cpp | 297 ++++++++++++++++++++++++ 3 files changed, 538 insertions(+), 15 deletions(-) create mode 100644 velox/exec/tests/SetAccumulatorTest.cpp diff --git a/velox/exec/SetAccumulator.h b/velox/exec/SetAccumulator.h index 14814d6385b8..eaac7ceca5c7 100644 --- a/velox/exec/SetAccumulator.h +++ b/velox/exec/SetAccumulator.h @@ -16,6 +16,8 @@ #pragma once #include + +#include "velox/common/base/IOUtils.h" #include "velox/common/memory/HashStringAllocator.h" #include "velox/exec/AddressableNonNullValueList.h" #include "velox/exec/Strings.h" @@ -29,6 +31,22 @@ namespace detail { /// Maintains a set of unique values. Non-null values are stored in F14FastSet. /// A separate flag tracks presence of the null value. +/// The SetAccumulator also tracks the order in which the values are added to +/// the accumulator (for ordered aggregations). So each value is associated with +/// an index of its position. + +/// SetAccumulator supports serialization/deserialization to/from an array of +/// byte streams. +/// These are used in the spilling logic of operators using SetAccumulator. + +/// The serialization format is : +/// i) The first element of the array is the index of the null value +/// (or -1 if no null value). +/// ii) The list of values (and optionally some metadata) in the order of +/// their positions. The null position (if one) is skipped. +/// iii) For scalar and string types, only the value is serialized in the +/// order of their indexes in the accumulator. For a complex type, a tuple +/// of (hash, value) is serialized. template < typename T, typename Hash = std::hash, @@ -86,6 +104,26 @@ struct SetAccumulator { } } + /// Deserializes accumulator from previously serialized value. + void deserialize( + const FlatVector& vector, + vector_size_t index, + vector_size_t size, + HashStringAllocator* /*allocator*/) { + // The serialized value is the nullIndex (kNoNullIndex if no null is + // present) followed by the unique values ordered by index. + deserializeNullIndex(vector.valueAt(index).data()); + + // Mark the nullPosition beyond values to correctly offset when reading the + // stream. + const auto nullPosition = nullIndex.has_value() ? nullIndex.value() : size; + for (auto i = 1, j = 0; i < size; i++, j++) { + T value = *reinterpret_cast(vector.valueAt(index + i).data()); + auto pos = (j < nullPosition) ? j : i; + uniqueValues.insert({value, pos}); + } + } + /// Returns number of unique values including null. size_t size() const { return uniqueValues.size() + (nullIndex.has_value() ? 1 : 0); @@ -106,10 +144,47 @@ struct SetAccumulator { : uniqueValues.size(); } + /// Serializes a sequence of VARBINARY values starting at result[index]. + /// This is used for the spill of this accumulator. + void serialize(const VectorPtr& result, vector_size_t index) { + auto* flatResult = result->as>(); + VELOX_CHECK_LE(uniqueValues.size() + 1, flatResult->size()); + + auto nullIndexValue = nullIndexSerializationValue(); + flatResult->set(index, StringView( + reinterpret_cast(&nullIndexValue), sizeof(vector_size_t))); + + // The null position is skipped when serializing values, so setting an out + // of bound value for no null position. + const auto nullPosition = + nullIndex.has_value() ? nullIndex.value() : uniqueValues.size(); + const auto sizeOfT = sizeof(T); + for (const auto& value : uniqueValues) { + auto pos = value.second; + auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1; + flatResult->set(offset, + StringView(reinterpret_cast(&value.first), sizeOfT)); + } + } + void free(HashStringAllocator& allocator) { using UT = decltype(uniqueValues); uniqueValues.~UT(); } + + void deserializeNullIndex(const char* input) { + VELOX_CHECK(!nullIndex.has_value()); + auto streamNullIndex = *reinterpret_cast(input); + if (streamNullIndex != kNoNullIndex) { + nullIndex = streamNullIndex; + } + } + + vector_size_t nullIndexSerializationValue() { + return nullIndex.has_value() ? nullIndex.value() : kNoNullIndex; + } + + static const vector_size_t kNoNullIndex = -1; }; /// Maintains a set of unique strings. @@ -134,14 +209,7 @@ struct StringViewSetAccumulator { } } else { auto value = decoded.valueAt(index); - if (!value.isInline()) { - if (base.uniqueValues.contains(value)) { - return; - } - value = strings.append(value, *allocator); - } - base.uniqueValues.insert( - {value, base.nullIndex.has_value() ? cnt + 1 : cnt}); + addValue(value, base.nullIndex.has_value() ? cnt + 1 : cnt, allocator); } } @@ -158,6 +226,23 @@ struct StringViewSetAccumulator { } } + void deserialize( + const FlatVector& vector, + vector_size_t index, + vector_size_t size, + HashStringAllocator* allocator) { + base.deserializeNullIndex(vector.valueAt(index).data()); + + // Mark the nullPosition beyond values to correctly offset when reading the + // stream. + const auto nullPosition = + base.nullIndex.has_value() ? base.nullIndex.value() : size; + for (auto i = 1; i < size; i++) { + auto pos = i - 1 < nullPosition ? i - 1 : i; + addUniqueValue(vector.valueAt(index + i), pos, allocator); + } + } + size_t size() const { return base.size(); } @@ -168,11 +253,57 @@ struct StringViewSetAccumulator { return base.extractValues(values, offset); } + /// Serialize an array of VARBINARY representation starting from + /// result[index]. This is used for the spill of this accumulator. + void serialize(const VectorPtr& result, vector_size_t index) { + auto* flatResult = result->as>(); + VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size()); + + auto nullIndexValue = base.nullIndexSerializationValue(); + flatResult->set(index, + StringView((const char*)&nullIndexValue, sizeof(vector_size_t))); + + // The null position is skipped when serializing values, so setting an out + // of bound value for no null position. + const auto nullPosition = base.nullIndex.has_value() + ? base.nullIndex.value() + : base.uniqueValues.size(); + for (const auto& value : base.uniqueValues) { + auto pos = value.second; + auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1; + flatResult->set(offset, value.first); + } + } + void free(HashStringAllocator& allocator) { strings.free(allocator); using Base = decltype(base); base.~Base(); } + + private: + void addValue( + const StringView& value, + vector_size_t index, + HashStringAllocator* allocator) { + if (base.uniqueValues.contains(value)) { + return; + } + + addUniqueValue(value, index, allocator); + } + + void addUniqueValue( + const StringView& value, + vector_size_t index, + HashStringAllocator* allocator) { + VELOX_CHECK(!base.uniqueValues.contains(value)); + StringView valueCopy = value; + if (!valueCopy.isInline()) { + valueCopy = strings.append(value, *allocator); + } + base.uniqueValues.insert({valueCopy, index}); + } }; /// Maintains a set of unique arrays, maps or structs. @@ -187,6 +318,12 @@ struct ComplexTypeSetAccumulator { /// Stores unique non-null values. AddressableNonNullValueList values; + // Tracks the size of the biggest ComplexType in the set. This is used for + // allocating a temporary buffer during serialization. + size_t maxSize = 0; + + static constexpr size_t kSizeOfHash = sizeof(uint64_t); + ComplexTypeSetAccumulator(const TypePtr& type, HashStringAllocator* allocator) : base{ AddressableNonNullValueList::Hash{}, @@ -203,13 +340,9 @@ struct ComplexTypeSetAccumulator { base.nullIndex = cnt; } } else { - auto entry = values.append(decoded, index, allocator); - - if (!base.uniqueValues - .insert({entry, base.nullIndex.has_value() ? cnt + 1 : cnt}) - .second) { - values.removeLast(entry); - } + const auto entry = values.append(decoded, index, allocator); + const auto position = base.nullIndex.has_value() ? cnt + 1 : cnt; + addEntry(entry, position); } } @@ -226,6 +359,30 @@ struct ComplexTypeSetAccumulator { } } + void deserialize( + const FlatVector& vector, + vector_size_t index, + vector_size_t size, + HashStringAllocator* allocator) { + base.deserializeNullIndex(vector.valueAt(index).data()); + + // Mark the nullPosition beyond values to correctly offset when reading the + // stream. + const auto nullPosition = + base.nullIndex.has_value() ? base.nullIndex.value() : size; + for (auto i = 1; i < size; i++) { + auto value = vector.valueAt(index + i); + auto stream = common::InputByteStream(value.data()); + auto hash = stream.read(); + auto length = value.size() - kSizeOfHash; + auto contents = StringView(stream.read(length), length); + auto position = values.appendSerialized(contents, allocator); + + auto pos = (i - 1 < nullPosition) ? i - 1 : i; + addEntry({position, contents.size(), hash}, pos); + } + } + size_t size() const { return base.size(); } @@ -243,11 +400,79 @@ struct ComplexTypeSetAccumulator { return base.uniqueValues.size() + (base.nullIndex.has_value() ? 1 : 0); } + /// Starting from result[index] append a sequence of VARBINARY values for + /// serialization for spilling.. + void serialize(const VectorPtr& result, vector_size_t index) { + auto* flatResult = result->as>(); + VELOX_CHECK_LE(base.uniqueValues.size() + 1, flatResult->size()); + + auto nullIndexValue = base.nullIndexSerializationValue(); + flatResult->set(index, + StringView((const char*)&nullIndexValue, sizeof(vector_size_t))); + + // Temporary buffer used during serialization. + auto* tempBuffer = + (char*)(flatResult->pool()->allocate(maxSize + kSizeOfHash)); + + // The null position is skipped when serializing values, so setting an out + // of bound value for no null position. + const auto nullPosition = base.nullIndex.has_value() + ? base.nullIndex.value() + : base.uniqueValues.size(); + for (const auto& value : base.uniqueValues) { + auto pos = value.second; + auto offset = (pos < nullPosition ? pos : pos - 1) + index + 1; + SerializationStream stream(tempBuffer, kSizeOfHash + value.first.size); + // Complex type hash. + stream.append(&value.first.hash, kSizeOfHash); + // Complex type value. + stream.append(value.first); + flatResult->set( + offset, StringView(tempBuffer, kSizeOfHash + value.first.size)); + } + + flatResult->pool()->free(tempBuffer, maxSize + kSizeOfHash); + } + void free(HashStringAllocator& allocator) { values.free(allocator); using Base = decltype(base); base.~Base(); } + + private: + // Simple stream abstraction for serialization logic. 'append' calls to concat + // values to the stream (for the input buffer) are exposed to the user. + struct SerializationStream { + char* rawBuffer; + const vector_size_t totalSize; + vector_size_t offset = 0; + + SerializationStream(char* buffer, vector_size_t totalSize) + : rawBuffer(buffer), totalSize(totalSize) {} + + void append(const void* value, vector_size_t size) { + VELOX_CHECK_LE(offset + size, totalSize); + memcpy(rawBuffer + offset, value, size); + offset += size; + } + + void append(const AddressableNonNullValueList::Entry& entry) { + VELOX_CHECK_LE(offset + entry.size, totalSize); + AddressableNonNullValueList::readSerialized(entry, rawBuffer + offset); + offset += entry.size; + } + }; + + void addEntry( + const AddressableNonNullValueList::Entry& entry, + vector_size_t index) { + if (!base.uniqueValues.insert({entry, index}).second) { + values.removeLast(entry); + } else { + maxSize = maxSize < entry.size ? entry.size : maxSize; + } + } }; template diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 21f1b70609ef..ec9d6226eed3 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -64,6 +64,7 @@ add_executable( RowContainerTest.cpp RowNumberTest.cpp MarkDistinctTest.cpp + SetAccumulatorTest.cpp SharedArbitratorTest.cpp SpillTest.cpp SpillOperatorGroupTest.cpp diff --git a/velox/exec/tests/SetAccumulatorTest.cpp b/velox/exec/tests/SetAccumulatorTest.cpp new file mode 100644 index 000000000000..bc0dcb025ea4 --- /dev/null +++ b/velox/exec/tests/SetAccumulatorTest.cpp @@ -0,0 +1,297 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/SetAccumulator.h" + +#include +#include "velox/exec/AddressableNonNullValueList.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::aggregate::prestosql { + +namespace { + +// The tests in this class validate the following +// (for both Primitive and Complex types) : +// i) Builds a SetAccumulator from the input data. +// ii) Tracks the unique values in the input data for validation. +// iii) Serializes the SetAccumulator and de-serializes the result in a second +// accumulator. +// The test validates that both accumulators have the same contents and the +// contents of the deserialized accumulator comprise the unique values from +// the input data. +class SetAccumulatorTest : public testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + template + void testPrimitive(const VectorPtr& data) { + std::unordered_set uniqueValues; + SetAccumulator accumulator(data->type(), allocator()); + DecodedVector decodedVector(*data); + vector_size_t nullPosition = -1; + for (auto i = 0; i < data->size(); ++i) { + if (decodedVector.isNullAt(i)) { + nullPosition = i; + } + accumulator.addValue(decodedVector, i, allocator()); + uniqueValues.insert(decodedVector.valueAt(i)); + } + ASSERT_EQ(accumulator.size(), uniqueValues.size()); + + auto serializedSize = + nullPosition == -1 ? uniqueValues.size() + 1 : uniqueValues.size(); + auto serialized = BaseVector::create(VARBINARY(), serializedSize, pool()); + accumulator.serialize(serialized, 0); + + // Initialize another accumulator from the serialized vector. + SetAccumulator accumulator2(data->type(), allocator()); + auto flatSerialized = serialized->template asFlatVector(); + accumulator2.deserialize( + *flatSerialized, 0, serialized->size(), allocator()); + + // Extract the contents of the accumulator. The contents should match + // all the uniqueValues. + auto copy = BaseVector::create(data->type(), accumulator2.size(), pool()); + auto copyFlat = copy->template asFlatVector(); + accumulator2.extractValues(*copyFlat, 0); + + ASSERT_EQ(copy->size(), accumulator.size()); + for (auto i = 0; i < copy->size(); i++) { + if (copyFlat->isNullAt(i)) { + ASSERT_EQ(i, nullPosition); + } else { + ASSERT_TRUE(uniqueValues.count(copyFlat->valueAt(i)) != 0); + } + } + } + + void testComplexType(const VectorPtr& data) { + using T = AddressableNonNullValueList::Entry; + using Set = folly::F14FastSet< + T, + AddressableNonNullValueList::Hash, + AddressableNonNullValueList::EqualTo, + AlignedStlAllocator>; + + // Unique values set used for validation in the tests. + AddressableNonNullValueList values; + Set uniqueValues{ + 0, + AddressableNonNullValueList::Hash{}, + AddressableNonNullValueList::EqualTo{data->type()}, + AlignedStlAllocator(allocator())}; + + // Build an accumulator from the input data. Also create a set of the + // unique values for validation. + SetAccumulator accumulator1(data->type(), allocator()); + DecodedVector decodedVector(*data); + vector_size_t nullPosition = -1; + for (auto i = 0; i < data->size(); ++i) { + accumulator1.addValue(decodedVector, i, allocator()); + if (!decodedVector.isNullAt(i)) { + auto entry = values.append(decodedVector, i, allocator()); + if (uniqueValues.contains(entry)) { + values.removeLast(entry); + continue; + } + ASSERT_TRUE(uniqueValues.insert(entry).second); + ASSERT_TRUE(uniqueValues.contains(entry)); + ASSERT_FALSE(uniqueValues.insert(entry).second); + } else { + nullPosition = i; + } + } + + auto accumulatorSizeCheck = + [&](const SetAccumulator& accumulator) { + if (nullPosition != -1) { + ASSERT_EQ(accumulator.size(), uniqueValues.size() + 1); + } else { + ASSERT_EQ(accumulator.size(), uniqueValues.size()); + } + }; + accumulatorSizeCheck(accumulator1); + + // Serialize the accumulator. + auto serialized = + BaseVector::create(VARBINARY(), uniqueValues.size() + 1, pool()); + accumulator1.serialize(serialized, 0); + + // Initialize another accumulator from the serialized vector. + SetAccumulator accumulator2(data->type(), allocator()); + auto serializedFlat = serialized->asFlatVector(); + accumulator2.deserialize( + *serializedFlat, 0, serialized->size(), allocator()); + ASSERT_EQ(accumulator2.size(), accumulator1.size()); + accumulatorSizeCheck(accumulator2); + + // Extract the contents of the deserialized accumulator. + // All the values extracted are in the uniqueValues set already. + auto copy = BaseVector::create(data->type(), accumulator2.size(), pool()); + accumulator2.extractValues(*copy, 0); + DecodedVector copyDecoded(*copy); + for (auto i = 0; i < copy->size(); ++i) { + if (copyDecoded.isNullAt(i)) { + ASSERT_EQ(i, nullPosition); + } else { + auto position = values.append(copyDecoded, i, allocator()); + ASSERT_TRUE(uniqueValues.contains(position)); + values.removeLast(position); + } + } + } + + HashStringAllocator* allocator() { + return allocator_.get(); + } + + std::unique_ptr allocator_{ + std::make_unique(pool())}; +}; + +TEST_F(SetAccumulatorTest, integral) { + auto data1 = makeFlatVector({1, 2, 3, 4, 5}); + testPrimitive(data1); + auto data2 = makeFlatVector({1, 2, 2, 3, 3, 4, 5, 5}); + testPrimitive(data2); + auto data3 = makeFlatVector({1, 2, 2, 3, 4, 5, 3, 1, 4}); + testPrimitive(data3); + auto data4 = makeNullableFlatVector({std::nullopt, 1, 2}); + testPrimitive(data4); +} + +TEST_F(SetAccumulatorTest, date) { + auto data = makeFlatVector({1, 2, 3, 4, 5}, DATE()); + testPrimitive(data); + data = makeFlatVector({1, 2, 2, 3, 3, 4, 5, 5}, DATE()); + testPrimitive(data); + data = makeFlatVector({1, 2, 2, 3, 4, 5, 3, 1, 4}, DATE()); + testPrimitive(data); + data = makeNullableFlatVector({1, 2, std::nullopt}, DATE()); + testPrimitive(data); +} + +TEST_F(SetAccumulatorTest, strings) { + auto data = + makeFlatVector({"abc", "non-inline string", "1234!@#$"}); + testPrimitive(data); + + data = makeFlatVector( + {"abc", + "non-inline string", + "non-inline string", + "reallylongstringreallylongstringreallylongstring", + "1234!@#$", + "abc"}); + testPrimitive(data); + + data = makeNullableFlatVector({"abc", std::nullopt, "def"}); + testPrimitive(data); +} + +TEST_F(SetAccumulatorTest, array) { + auto data = makeArrayVector({ + {1, 2, 3}, + {4, 5}, + {6, 7, 8, 9}, + {}, + }); + testComplexType(data); + + data = makeNullableArrayVector({ + {1, 2, 3}, + {4, 5}, + {std::nullopt}, + {6, 7, 8, 9}, + {}, + }); + testComplexType(data); + + data = makeArrayVector({ + {1, 2, 3}, + {1, 2, 3}, + {4, 5}, + {6, 7, 8, 9}, + {}, + {4, 5}, + {1, 2, 3}, + {}, + }); + testComplexType(data); +} + +TEST_F(SetAccumulatorTest, map) { + auto data = makeMapVector({ + {{1, 10.1213}, {2, 20}}, + {{3, 30}, {4, 40.258703570235497205}, {5, 50}}, + {{1, 10.4324}, {3, 30}, {4, 40.45209809}, {6, 60}}, + {}, + }); + testComplexType(data); + + data = makeNullableMapVector({ + {{{1, "abc"}, {2, "this is a non-inline string"}}}, + std::nullopt, + {{{3, "qrs"}, {4, "m"}, {5, "%&^%&^af489372843"}}}, + {{}}, + }); + testComplexType(data); + + // Has non-unique rows. + data = makeMapVector({ + {{1, 10}, {2, 20}}, + {{3, 30}, {4, 40}, {5, 50}}, + {{3, 30}, {4, 40}, {5, 50}}, + {{1, 10}, {2, 20}}, + {{1, 10}, {3, 30}, {4, 40}, {6, 60}}, + {}, + {{1, 10}, {2, 20}}, + {}, + {{3, 30}, {4, 40}, {5, 50}}, + }); + testComplexType(data); +} + +TEST_F(SetAccumulatorTest, row) { + auto data = makeRowVector({ + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector( + {"abc", "this is a non-inline string", "def", "ghij", "klm"}), + makeFlatVector({11, 22, 33, 44, 55}), + }); + testComplexType(data); + + // Has non-unique rows. + data = makeRowVector({ + makeFlatVector({1, 2, 3, 4, 2, 5, 3}), + makeFlatVector( + {10.1, 20.1234567, 30.35, 40, 20.1234567, 50.42309234, 30}), + makeFlatVector({11, 22, 33, 44, 22, 55, 33}, DATE()), + }); + testComplexType(data); + + data = makeRowVector({ + makeNullableFlatVector({1, 2, std::nullopt, 4, 5}), + makeNullableFlatVector({10, 20, 30, std::nullopt, 50}), + makeNullableFlatVector({std::nullopt, 22, 33, std::nullopt, 55}), + }); + testComplexType(data); +} + +} // namespace +} // namespace facebook::velox::aggregate::prestosql