Skip to content

Commit a02646f

Browse files
add sparksql function bloom_filter_agg and might_contain
Change bit_ size to fix TPCDS performance
1 parent 277e0bf commit a02646f

File tree

14 files changed

+567
-38
lines changed

14 files changed

+567
-38
lines changed

velox/common/base/BloomFilter.h

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
#include <folly/Hash.h>
2323

2424
#include "velox/common/base/BitUtil.h"
25+
#include "velox/common/base/Exceptions.h"
26+
#include "velox/common/base/IOUtils.h"
27+
#include "velox/type/StringView.h"
2528

2629
namespace facebook::velox {
2730

@@ -31,9 +34,15 @@ namespace facebook::velox {
3134
// expected entry, we get ~2% false positives. 'hashInput' determines
3235
// if the value added or checked needs to be hashed. If this is false,
3336
// we assume that the input is already a 64 bit hash number.
34-
template <bool hashInput = true>
37+
// case:
38+
// InputType can be one of folly hasher support type when hashInput = false
39+
// InputType can only be uint64_t when hashInput = true
40+
template <class InputType = uint64_t, bool hashInput = true>
3541
class BloomFilter {
3642
public:
43+
BloomFilter(){};
44+
BloomFilter(std::vector<uint64_t> bits) : bits_(bits){};
45+
3746
// Prepares 'this' for use with an expected 'capacity'
3847
// entries. Drops any prior content.
3948
void reset(int32_t capacity) {
@@ -42,18 +51,60 @@ class BloomFilter {
4251
bits_.resize(std::max<int32_t>(4, bits::nextPowerOfTwo(capacity) / 4));
4352
}
4453

54+
bool isSet() {
55+
return bits_.size() > 0;
56+
}
57+
4558
// Adds 'value'.
46-
void insert(uint64_t value) {
59+
void insert(InputType value) {
4760
set(bits_.data(),
4861
bits_.size(),
49-
hashInput ? folly::hasher<uint64_t>()(value) : value);
62+
hashInput ? folly::hasher<InputType>()(value) : value);
5063
}
5164

52-
bool mayContain(uint64_t value) const {
65+
bool mayContain(InputType value) const {
5366
return test(
5467
bits_.data(),
5568
bits_.size(),
56-
hashInput ? folly::hasher<uint64_t>()(value) : value);
69+
hashInput ? folly::hasher<InputType>()(value) : value);
70+
}
71+
72+
// Combines the two bloomFilter bits_ using bitwise OR.
73+
void merge(BloomFilter& bloomFilter) {
74+
if (bits_.size() == 0) {
75+
bits_ = bloomFilter.bits_;
76+
return;
77+
} else if (bloomFilter.bits_.size() == 0){
78+
VELOX_FAIL("Input bit length should not be 0");
79+
}
80+
VELOX_CHECK_EQ(bits_.size(), bloomFilter.bits_.size());
81+
for (auto i = 0; i < bloomFilter.bits_.size(); i++) {
82+
bits_[i] |= bloomFilter.bits_[i];
83+
}
84+
}
85+
86+
uint32_t serializedSize() {
87+
return 4 /* number of bits */
88+
+ bits_.size() * 8;
89+
}
90+
91+
void serialize(StringView& output) {
92+
char* outputBuffer = const_cast<char*>(output.data());
93+
common::OutputByteStream stream(outputBuffer);
94+
stream.appendOne((int32_t)bits_.size());
95+
for (auto bit : bits_) {
96+
stream.appendOne(bit);
97+
}
98+
}
99+
100+
static void deserialize(const char* serialized, BloomFilter& output) {
101+
common::InputByteStream stream(serialized);
102+
auto size = stream.read<int32_t>();
103+
output.bits_.resize(size);
104+
auto bitsdata = reinterpret_cast<const uint64_t*>(serialized + stream.offset());
105+
for (auto i = 0; i < size; i++) {
106+
output.bits_[i] = bitsdata[i];
107+
}
57108
}
58109

59110
private:

velox/common/base/tests/BloomFilterTest.cpp

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ using namespace facebook::velox;
2424

2525
TEST(BloomFilterTest, basic) {
2626
constexpr int32_t kSize = 1024;
27-
BloomFilter bloom;
27+
BloomFilter<int32_t> bloom;
2828
bloom.reset(kSize);
2929
for (auto i = 0; i < kSize; ++i) {
3030
bloom.insert(i);
@@ -37,3 +37,46 @@ TEST(BloomFilterTest, basic) {
3737
}
3838
EXPECT_GT(2, 100 * numFalsePositives / kSize);
3939
}
40+
41+
TEST(BloomFilterTest, serialize) {
42+
constexpr int32_t kSize = 1024;
43+
BloomFilter<int32_t> bloom;
44+
bloom.reset(kSize);
45+
for (auto i = 0; i < kSize; ++i) {
46+
bloom.insert(i);
47+
}
48+
std::string data;
49+
data.resize(bloom.serializedSize());
50+
StringView serialized(data.data(), data.size());
51+
bloom.serialize(serialized);
52+
BloomFilter<int32_t> deserialized;
53+
BloomFilter<int32_t>::deserialize(data.data(), deserialized);
54+
for (auto i = 0; i < kSize; ++i) {
55+
EXPECT_TRUE(deserialized.mayContain(i));
56+
}
57+
58+
EXPECT_EQ(bloom.serializedSize(), deserialized.serializedSize());
59+
}
60+
61+
TEST(BloomFilterTest, merge) {
62+
constexpr int32_t kSize = 10;
63+
BloomFilter<int32_t> bloom;
64+
bloom.reset(kSize);
65+
for (auto i = 0; i < kSize; ++i) {
66+
bloom.insert(i);
67+
}
68+
69+
BloomFilter<int32_t> merge;
70+
merge.reset(kSize);
71+
for (auto i = kSize; i < kSize + kSize; i++) {
72+
merge.insert(i);
73+
}
74+
75+
bloom.merge(merge);
76+
77+
for (auto i = 0; i < kSize + kSize; ++i) {
78+
EXPECT_TRUE(bloom.mayContain(i));
79+
}
80+
81+
EXPECT_EQ(bloom.serializedSize(), merge.serializedSize());
82+
}

velox/functions/sparksql/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ target_link_libraries(
3636
set_property(TARGET velox_functions_spark PROPERTY JOB_POOL_COMPILE
3737
high_memory_pool)
3838

39+
if(${VELOX_ENABLE_AGGREGATES})
40+
add_subdirectory(aggregates)
41+
endif()
42+
3943
if(${VELOX_BUILD_TESTING})
4044
add_subdirectory(tests)
41-
add_subdirectory(aggregates)
4245
endif()
4346

4447
if(${VELOX_ENABLE_BENCHMARKS})
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/common/base/BloomFilter.h"
17+
#include "velox/expression/VectorFunction.h"
18+
#include "velox/functions/Macros.h"
19+
#include "velox/functions/lib/string/StringImpl.h"
20+
21+
namespace facebook::velox::functions::sparksql {
22+
23+
template <typename T>
24+
struct MightContainFunction {
25+
VELOX_DEFINE_FUNCTION_TYPES(T);
26+
27+
FOLLY_ALWAYS_INLINE bool call(
28+
out_type<bool>& result,
29+
const arg_type<Varbinary>& serializedBloom,
30+
const arg_type<int64_t>& value) {
31+
BloomFilter<int64_t, false> output;
32+
BloomFilter<int64_t, false>::deserialize(
33+
std::string(std::string_view(serializedBloom)).data(), output);
34+
return output.mayContain(value);
35+
}
36+
};
37+
38+
} // namespace facebook::velox::functions::sparksql

velox/functions/sparksql/Register.cpp

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "velox/functions/sparksql/Hash.h"
2929
#include "velox/functions/sparksql/In.h"
3030
#include "velox/functions/sparksql/LeastGreatest.h"
31+
#include "velox/functions/sparksql/MightContain.h"
3132
#include "velox/functions/sparksql/RegexFunctions.h"
3233
#include "velox/functions/sparksql/RegisterArithmetic.h"
3334
#include "velox/functions/sparksql/RegisterCompare.h"
@@ -156,22 +157,16 @@ void registerFunctions(const std::string& prefix) {
156157
{prefix + "millisecond"});
157158
registerFunction<MillisecondFunction, int32_t, TimestampWithTimezone>(
158159
{prefix + "millisecond"});
159-
registerFunction<SecondFunction, int32_t, Date>(
160-
{prefix + "second"});
161-
registerFunction<SecondFunction, int32_t, Timestamp>(
162-
{prefix + "second"});
160+
registerFunction<SecondFunction, int32_t, Date>({prefix + "second"});
161+
registerFunction<SecondFunction, int32_t, Timestamp>({prefix + "second"});
163162
registerFunction<SecondFunction, int32_t, TimestampWithTimezone>(
164163
{prefix + "second"});
165-
registerFunction<MinuteFunction, int32_t, Date>(
166-
{prefix + "minute"});
167-
registerFunction<MinuteFunction, int32_t, Timestamp>(
168-
{prefix + "minute"});
164+
registerFunction<MinuteFunction, int32_t, Date>({prefix + "minute"});
165+
registerFunction<MinuteFunction, int32_t, Timestamp>({prefix + "minute"});
169166
registerFunction<MinuteFunction, int32_t, TimestampWithTimezone>(
170167
{prefix + "minute"});
171-
registerFunction<HourFunction, int32_t, Date>(
172-
{prefix + "hour"});
173-
registerFunction<HourFunction, int32_t, Timestamp>(
174-
{prefix + "hour"});
168+
registerFunction<HourFunction, int32_t, Date>({prefix + "hour"});
169+
registerFunction<HourFunction, int32_t, Timestamp>({prefix + "hour"});
175170
registerFunction<HourFunction, int32_t, TimestampWithTimezone>(
176171
{prefix + "hour"});
177172
registerFunction<DayFunction, int32_t, Date>(
@@ -180,34 +175,26 @@ void registerFunctions(const std::string& prefix) {
180175
{prefix + "day", prefix + "day_of_month"});
181176
registerFunction<DayFunction, int32_t, TimestampWithTimezone>(
182177
{prefix + "day", prefix + "day_of_month"});
183-
registerFunction<DayOfWeekFunction, int32_t, Date>(
184-
{prefix + "day_of_week"});
178+
registerFunction<DayOfWeekFunction, int32_t, Date>({prefix + "day_of_week"});
185179
registerFunction<DayOfWeekFunction, int32_t, Timestamp>(
186180
{prefix + "day_of_week"});
187181
registerFunction<DayOfWeekFunction, int32_t, TimestampWithTimezone>(
188182
{prefix + "day_of_week"});
189-
registerFunction<DayOfYearFunction, int32_t, Date>(
190-
{prefix + "day_of_year"});
183+
registerFunction<DayOfYearFunction, int32_t, Date>({prefix + "day_of_year"});
191184
registerFunction<DayOfYearFunction, int32_t, Timestamp>(
192185
{prefix + "day_of_year"});
193186
registerFunction<DayOfYearFunction, int32_t, TimestampWithTimezone>(
194187
{prefix + "day_of_year"});
195-
registerFunction<MonthFunction, int32_t, Date>(
196-
{prefix + "month"});
197-
registerFunction<MonthFunction, int32_t, Timestamp>(
198-
{prefix + "month"});
188+
registerFunction<MonthFunction, int32_t, Date>({prefix + "month"});
189+
registerFunction<MonthFunction, int32_t, Timestamp>({prefix + "month"});
199190
registerFunction<MonthFunction, int32_t, TimestampWithTimezone>(
200191
{prefix + "month"});
201-
registerFunction<QuarterFunction, int32_t, Date>(
202-
{prefix + "quarter"});
203-
registerFunction<QuarterFunction, int32_t, Timestamp>(
204-
{prefix + "quarter"});
192+
registerFunction<QuarterFunction, int32_t, Date>({prefix + "quarter"});
193+
registerFunction<QuarterFunction, int32_t, Timestamp>({prefix + "quarter"});
205194
registerFunction<QuarterFunction, int32_t, TimestampWithTimezone>(
206195
{prefix + "quarter"});
207-
registerFunction<YearFunction, int32_t, Date>(
208-
{prefix + "year"});
209-
registerFunction<YearFunction, int32_t, Timestamp>(
210-
{prefix + "year"});
196+
registerFunction<YearFunction, int32_t, Date>({prefix + "year"});
197+
registerFunction<YearFunction, int32_t, Timestamp>({prefix + "year"});
211198
registerFunction<YearFunction, int32_t, TimestampWithTimezone>(
212199
{prefix + "year"});
213200
registerFunction<YearOfWeekFunction, int32_t, Date>(
@@ -216,6 +203,10 @@ void registerFunctions(const std::string& prefix) {
216203
{prefix + "year_of_week"});
217204
registerFunction<YearOfWeekFunction, int32_t, TimestampWithTimezone>(
218205
{prefix + "year_of_week"});
206+
207+
// Register bloom filter function
208+
registerFunction<MightContainFunction, bool, Varbinary, int64_t>(
209+
{prefix + "might_contain"});
219210
}
220211

221212
} // namespace sparksql

0 commit comments

Comments
 (0)