Skip to content

Commit d930888

Browse files
committed
[optimize](parquet-reader) Optimize performace by parquet bloom filter.
1 parent 5731d0b commit d930888

File tree

22 files changed

+2155
-213
lines changed

22 files changed

+2155
-213
lines changed

be/src/olap/column_predicate.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ class ColumnPredicate {
216216
return false;
217217
}
218218

219+
virtual bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const {
220+
return true;
221+
}
222+
219223
virtual bool evaluate_and(const BloomFilter* bf) const { return true; }
220224

221225
virtual bool evaluate_and(const StringRef* dict_words, const size_t dict_count) const {

be/src/olap/comparison_predicate.h

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,20 +205,30 @@ class ComparisonPredicateBase : public ColumnPredicate {
205205
}
206206

207207
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
208-
if (!(*statistic->get_stat_func)(statistic, column_id())) {
209-
return true;
208+
bool result = true;
209+
if ((*statistic->get_stat_func)(statistic, column_id())) {
210+
vectorized::Field min_field;
211+
vectorized::Field max_field;
212+
if (!vectorized::ParquetPredicate::parse_min_max_value(
213+
statistic->col_schema, statistic->encoded_min_value,
214+
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
215+
.ok()) [[unlikely]] {
216+
result = true;
217+
} else {
218+
result = camp_field(min_field, max_field);
219+
}
210220
}
211221

212-
vectorized::Field min_field;
213-
vectorized::Field max_field;
214-
if (!vectorized::ParquetPredicate::parse_min_max_value(
215-
statistic->col_schema, statistic->encoded_min_value,
216-
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
217-
.ok()) [[unlikely]] {
218-
return true;
219-
};
220-
221-
return camp_field(min_field, max_field);
222+
if constexpr (PT == PredicateType::EQ) {
223+
if (result && statistic->get_bloom_filter_func != nullptr &&
224+
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
225+
if (!statistic->bloom_filter) {
226+
return result;
227+
}
228+
return evaluate_and(statistic->bloom_filter.get());
229+
}
230+
}
231+
return result;
222232
}
223233

224234
bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
@@ -343,6 +353,43 @@ class ComparisonPredicateBase : public ColumnPredicate {
343353
return PT == PredicateType::EQ && !ngram;
344354
}
345355

356+
bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
357+
if constexpr (PT == PredicateType::EQ) {
358+
auto test_bytes = [&]<typename V>(const V& value) {
359+
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&value)),
360+
sizeof(V));
361+
};
362+
363+
// Only support Parquet native types where physical == logical representation
364+
// BOOLEAN -> hash as int32 (Parquet bool stored as int32)
365+
if constexpr (Type == PrimitiveType::TYPE_BOOLEAN) {
366+
int32_t int32_value = static_cast<int32_t>(_value);
367+
return test_bytes(int32_value);
368+
} else if constexpr (Type == PrimitiveType::TYPE_INT) {
369+
// INT -> hash as int32
370+
return test_bytes(_value);
371+
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
372+
// BIGINT -> hash as int64
373+
return test_bytes(_value);
374+
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
375+
// FLOAT -> hash as float
376+
return test_bytes(_value);
377+
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
378+
// DOUBLE -> hash as double
379+
return test_bytes(_value);
380+
} else if constexpr (std::is_same_v<T, StringRef>) {
381+
// VARCHAR/STRING -> hash bytes
382+
return bf->test_bytes(_value.data, _value.size);
383+
} else {
384+
// Unsupported types: return true (accept)
385+
return true;
386+
}
387+
} else {
388+
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
389+
return true;
390+
}
391+
}
392+
346393
void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
347394
bool* flags) const override {
348395
_evaluate_bit<false>(column, sel, size, flags);

be/src/olap/in_list_predicate.h

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -313,20 +313,30 @@ class InListPredicateBase : public ColumnPredicate {
313313
}
314314

315315
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
316-
if (!(*statistic->get_stat_func)(statistic, column_id())) {
317-
return true;
316+
bool result = true;
317+
if ((*statistic->get_stat_func)(statistic, column_id())) {
318+
vectorized::Field min_field;
319+
vectorized::Field max_field;
320+
if (!vectorized::ParquetPredicate::parse_min_max_value(
321+
statistic->col_schema, statistic->encoded_min_value,
322+
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
323+
.ok()) [[unlikely]] {
324+
result = true;
325+
} else {
326+
result = camp_field(min_field, max_field);
327+
}
318328
}
319329

320-
vectorized::Field min_field;
321-
vectorized::Field max_field;
322-
if (!vectorized::ParquetPredicate::parse_min_max_value(
323-
statistic->col_schema, statistic->encoded_min_value,
324-
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
325-
.ok()) [[unlikely]] {
326-
return true;
327-
};
328-
329-
return camp_field(min_field, max_field);
330+
if constexpr (PT == PredicateType::IN_LIST) {
331+
if (result && statistic->get_bloom_filter_func != nullptr &&
332+
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
333+
if (!statistic->bloom_filter) {
334+
return result;
335+
}
336+
return evaluate_and(statistic->bloom_filter.get());
337+
}
338+
}
339+
return result;
330340
}
331341

332342
bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
@@ -445,6 +455,58 @@ class InListPredicateBase : public ColumnPredicate {
445455
return get_in_list_ignore_thredhold(_values->size());
446456
}
447457

458+
bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
459+
if constexpr (PT == PredicateType::IN_LIST) {
460+
HybridSetBase::IteratorBase* iter = _values->begin();
461+
while (iter->has_next()) {
462+
const T* value = (const T*)(iter->get_value());
463+
464+
auto test_bytes = [&]<typename V>(const V& val) {
465+
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&val)),
466+
sizeof(V));
467+
};
468+
469+
// Small integers (TINYINT, SMALLINT, INTEGER) -> hash as int32
470+
if constexpr (Type == PrimitiveType::TYPE_TINYINT ||
471+
Type == PrimitiveType::TYPE_SMALLINT ||
472+
Type == PrimitiveType::TYPE_INT) {
473+
int32_t int32_value = static_cast<int32_t>(*value);
474+
if (test_bytes(int32_value)) {
475+
return true;
476+
}
477+
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
478+
// BIGINT -> hash as int64
479+
if (test_bytes(*value)) {
480+
return true;
481+
}
482+
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
483+
// DOUBLE -> hash as double
484+
if (test_bytes(*value)) {
485+
return true;
486+
}
487+
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
488+
// FLOAT -> hash as float
489+
if (test_bytes(*value)) {
490+
return true;
491+
}
492+
} else if constexpr (std::is_same_v<T, StringRef>) {
493+
// VARCHAR/STRING -> hash bytes
494+
if (bf->test_bytes(value->data, value->size)) {
495+
return true;
496+
}
497+
} else {
498+
// Unsupported types: return true (accept)
499+
return true;
500+
}
501+
iter->next();
502+
}
503+
return false;
504+
} else {
505+
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
506+
return true;
507+
}
508+
}
509+
448510
private:
449511
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
450512
uint16_t size) const override {

be/src/olap/rowset/segment_v2/bloom_filter.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ class BloomFilter {
102102
return this->init(optimal_bit_num(n, fpp) / 8, strategy);
103103
}
104104

105-
Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }
105+
virtual Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }
106106

107-
Status init(uint64_t filter_size, HashStrategyPB strategy) {
107+
virtual Status init(uint64_t filter_size, HashStrategyPB strategy) {
108108
if (strategy == HASH_MURMUR3_X64_64) {
109109
_hash_func = murmur_hash3_x64_64;
110110
} else {
@@ -182,7 +182,7 @@ class BloomFilter {
182182
add_hash(code);
183183
}
184184

185-
bool test_bytes(const char* buf, size_t size) const {
185+
virtual bool test_bytes(const char* buf, size_t size) const {
186186
if (buf == nullptr) {
187187
return *_has_null;
188188
}
@@ -200,7 +200,7 @@ class BloomFilter {
200200

201201
virtual size_t size() const { return _size; }
202202

203-
void set_has_null(bool has_null) { *_has_null = has_null; }
203+
virtual void set_has_null(bool has_null) { *_has_null = has_null; }
204204

205205
virtual bool has_null() const { return *_has_null; }
206206

@@ -239,7 +239,6 @@ class BloomFilter {
239239
// is this bf used for write
240240
bool _is_write = false;
241241

242-
private:
243242
std::function<void(const void*, const int64_t, const uint64_t, void*)> _hash_func;
244243
};
245244

be/src/util/hash_util.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <gen_cpp/Types_types.h>
2424
#include <xxh3.h>
25+
#include <xxhash.h>
2526
#include <zlib.h>
2627

2728
#include <bit>
@@ -362,6 +363,15 @@ class HashUtil {
362363
return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE), sizeof(int), seed);
363364
}
364365

366+
static xxh_u64 xxhash64_compat_with_seed(const char* s, size_t len, xxh_u64 seed) {
367+
return XXH64(reinterpret_cast<const void*>(s), len, seed);
368+
}
369+
370+
static xxh_u64 xxhash64_compat_null_with_seed(xxh_u64 seed) {
371+
static const int INT_VALUE = 0;
372+
return XXH64(reinterpret_cast<const void*>(&INT_VALUE), sizeof(int), seed);
373+
}
374+
365375
#if defined(__clang__)
366376
#pragma clang diagnostic pop
367377
#endif
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <glog/logging.h>
19+
20+
#include <cstring>
21+
22+
#include "vec/exec/format/parquet/vparquet_column_reader.h"
23+
24+
namespace doris {
25+
namespace vectorized {
26+
27+
// for write
28+
Status ParquetBlockSplitBloomFilter::init(uint64_t filter_size,
29+
segment_v2::HashStrategyPB strategy) {
30+
if (strategy == XX_HASH_64) {
31+
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
32+
auto h =
33+
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
34+
*reinterpret_cast<uint64_t*>(out) = h;
35+
};
36+
} else {
37+
return Status::InvalidArgument("invalid strategy:{}", strategy);
38+
}
39+
_num_bytes = filter_size;
40+
_size = _num_bytes;
41+
_data = new char[_size];
42+
memset(_data, 0, _size);
43+
_has_null = nullptr;
44+
_is_write = true;
45+
g_write_bloom_filter_num << 1;
46+
g_write_bloom_filter_total_bytes << _size;
47+
g_total_bloom_filter_total_bytes << _size;
48+
return Status::OK();
49+
}
50+
51+
// for read
52+
// use deep copy to acquire the data
53+
Status ParquetBlockSplitBloomFilter::init(const char* buf, size_t size,
54+
segment_v2::HashStrategyPB strategy) {
55+
if (size <= 1) {
56+
return Status::InvalidArgument("invalid size:{}", size);
57+
}
58+
DCHECK(size > 1);
59+
if (strategy == XX_HASH_64) {
60+
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
61+
auto h =
62+
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
63+
*reinterpret_cast<uint64_t*>(out) = h;
64+
};
65+
} else {
66+
return Status::InvalidArgument("invalid strategy:{}", strategy);
67+
}
68+
if (buf == nullptr) {
69+
return Status::InvalidArgument("buf is nullptr");
70+
}
71+
72+
_data = new char[size];
73+
memcpy(_data, buf, size);
74+
_size = size;
75+
_num_bytes = _size;
76+
_has_null = nullptr;
77+
g_read_bloom_filter_num << 1;
78+
g_read_bloom_filter_total_bytes << _size;
79+
g_total_bloom_filter_total_bytes << _size;
80+
return Status::OK();
81+
}
82+
83+
void ParquetBlockSplitBloomFilter::add_bytes(const char* buf, size_t size) {
84+
DCHECK(buf != nullptr) << "Parquet bloom filter does not track nulls";
85+
uint64_t code = hash(buf, size);
86+
add_hash(code);
87+
}
88+
89+
bool ParquetBlockSplitBloomFilter::test_bytes(const char* buf, size_t size) const {
90+
uint64_t code = hash(buf, size);
91+
return test_hash(code);
92+
}
93+
94+
void ParquetBlockSplitBloomFilter::set_has_null(bool has_null) {
95+
DCHECK(!has_null) << "Parquet bloom filter does not track nulls";
96+
}
97+
98+
void ParquetBlockSplitBloomFilter::add_hash(uint64_t hash) {
99+
DCHECK(_num_bytes >= BYTES_PER_BLOCK);
100+
const uint32_t bucket_index =
101+
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
102+
uint32_t key = static_cast<uint32_t>(hash);
103+
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
104+
105+
// Calculate mask for bucket.
106+
BlockMask block_mask;
107+
_set_masks(key, block_mask);
108+
109+
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
110+
bitset32[bucket_index * BITS_SET_PER_BLOCK + i] |= block_mask.item[i];
111+
}
112+
}
113+
114+
bool ParquetBlockSplitBloomFilter::test_hash(uint64_t hash) const {
115+
const uint32_t bucket_index =
116+
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
117+
uint32_t key = static_cast<uint32_t>(hash);
118+
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
119+
120+
// Calculate masks for bucket.
121+
BlockMask block_mask;
122+
_set_masks(key, block_mask);
123+
124+
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
125+
uint32_t bit_val = bitset32[BITS_SET_PER_BLOCK * bucket_index + i];
126+
if (0 == (bit_val & block_mask.item[i])) {
127+
return false;
128+
}
129+
}
130+
return true;
131+
}
132+
133+
} // namespace vectorized
134+
} // namespace doris

0 commit comments

Comments
 (0)