Skip to content

Commit

Permalink
[Refactor] Rename JoinRuntimeFilter to RuntimeFilter (StarRocks#55587)
Browse files Browse the repository at this point in the history
Signed-off-by: trueeyu <lxhhust350@qq.com>
  • Loading branch information
trueeyu authored Feb 6, 2025
1 parent b5db7fe commit df1cb6a
Show file tree
Hide file tree
Showing 28 changed files with 176 additions and 178 deletions.
2 changes: 1 addition & 1 deletion be/src/bench/runtime_filter_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static void do_benchmark_hash_partitioned(benchmark::State& state, TRuntimeFilte
++num_rows_per_partitions[hash_values[i]];
}

JoinRuntimeFilter::RunningContext running_ctx;
RuntimeFilter::RunningContext running_ctx;
running_ctx.selection.assign(num_rows, 2);
running_ctx.use_merged_selection = false;
running_ctx.compatibility = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status DataSource::parse_runtime_filters(RuntimeState* state) {
for (const auto& item : _runtime_filters->descriptors()) {
RuntimeFilterProbeDescriptor* probe = item.second;
DCHECK(runtime_bloom_filter_eval_context.driver_sequence != -1);
const JoinRuntimeFilter* filter = probe->runtime_filter(runtime_bloom_filter_eval_context.driver_sequence);
const RuntimeFilter* filter = probe->runtime_filter(runtime_bloom_filter_eval_context.driver_sequence);
if (filter == nullptr) continue;
SlotId slot_id;
if (!probe->is_probe_slot_ref(&slot_id)) continue;
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/chunks_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ChunksSorter {
virtual Status get_next(ChunkPtr* chunk, bool* eos) = 0;

// RuntimeFilter generate by ChunkSorter only works in TopNSorter and HeapSorter
virtual std::vector<JoinRuntimeFilter*>* runtime_filters(ObjectPool* pool) { return nullptr; }
virtual std::vector<RuntimeFilter*>* runtime_filters(ObjectPool* pool) { return nullptr; }

// Return accurate output rows of this operator
virtual size_t get_output_rows() const = 0;
Expand Down Expand Up @@ -179,8 +179,8 @@ class ChunksSorter {
namespace detail {
struct SortRuntimeFilterBuilder {
template <LogicalType ltype>
JoinRuntimeFilter* operator()(ObjectPool* pool, const ColumnPtr& column, int rid, bool asc, bool null_first,
bool is_close_interval) {
RuntimeFilter* operator()(ObjectPool* pool, const ColumnPtr& column, int rid, bool asc, bool null_first,
bool is_close_interval) {
bool need_null = false;
if (null_first) {
need_null = true;
Expand Down Expand Up @@ -220,7 +220,7 @@ struct SortRuntimeFilterBuilder {

struct SortRuntimeFilterUpdater {
template <LogicalType ltype>
std::nullptr_t operator()(JoinRuntimeFilter* filter, const ColumnPtr& column, int rid, bool asc, bool null_first,
std::nullptr_t operator()(RuntimeFilter* filter, const ColumnPtr& column, int rid, bool asc, bool null_first,
bool is_close_interval) {
if (null_first) {
if (column->is_null(rid)) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/chunks_sorter_heap_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Status ChunksSorterHeapSort::get_next(ChunkPtr* chunk, bool* eos) {
return Status::OK();
}

std::vector<JoinRuntimeFilter*>* ChunksSorterHeapSort::runtime_filters(ObjectPool* pool) {
std::vector<RuntimeFilter*>* ChunksSorterHeapSort::runtime_filters(ObjectPool* pool) {
if (_sort_heap == nullptr || _sort_heap->size() < _number_of_rows_to_sort()) {
return nullptr;
}
Expand All @@ -178,9 +178,9 @@ std::vector<JoinRuntimeFilter*>* ChunksSorterHeapSort::runtime_filters(ObjectPoo
bool null_first = _sort_desc.descs[0].is_null_first();

if (_runtime_filter.empty()) {
auto rf = type_dispatch_predicate<JoinRuntimeFilter*>(
(*_sort_exprs)[0]->root()->type().type, false, detail::SortRuntimeFilterBuilder(), pool,
top_cursor_column, cursor_rid, asc, null_first, is_close_interval);
auto rf = type_dispatch_predicate<RuntimeFilter*>((*_sort_exprs)[0]->root()->type().type, false,
detail::SortRuntimeFilterBuilder(), pool, top_cursor_column,
cursor_rid, asc, null_first, is_close_interval);
if (rf == nullptr) {
return nullptr;
} else {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/chunks_sorter_heap_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class ChunksSorterHeapSort final : public ChunksSorter {
Status update(RuntimeState* state, const ChunkPtr& chunk) override;
Status do_done(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;
std::vector<JoinRuntimeFilter*>* runtime_filters(ObjectPool* pool) override;
std::vector<RuntimeFilter*>* runtime_filters(ObjectPool* pool) override;
int64_t mem_usage() const override {
if (_sort_heap == nullptr || _sort_heap->empty()) {
return 0;
Expand All @@ -261,7 +261,7 @@ class ChunksSorterHeapSort final : public ChunksSorter {
template <LogicalType TYPE>
void _do_filter_data_for_type(detail::ChunkHolder* chunk_holder, Filter* filter, int row_sz);

std::vector<JoinRuntimeFilter*> _runtime_filter;
std::vector<RuntimeFilter*> _runtime_filter;

using CursorContainer = std::vector<detail::ChunkRowCursor>;
using CommonCursorSortHeap =
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/chunks_sorter_topn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Status ChunksSorterTopn::do_done(RuntimeState* state) {
return Status::OK();
}

std::vector<JoinRuntimeFilter*>* ChunksSorterTopn::runtime_filters(ObjectPool* pool) {
std::vector<RuntimeFilter*>* ChunksSorterTopn::runtime_filters(ObjectPool* pool) {
if (!_init_merged_segment) {
return nullptr;
}
Expand All @@ -127,7 +127,7 @@ std::vector<JoinRuntimeFilter*>* ChunksSorterTopn::runtime_filters(ObjectPool* p
bool null_first = _sort_desc.descs[0].is_null_first();

if (_runtime_filter.empty()) {
auto* rf = type_dispatch_predicate<JoinRuntimeFilter*>(
auto* rf = type_dispatch_predicate<RuntimeFilter*>(
(*_sort_exprs)[0]->root()->type().type, false, detail::SortRuntimeFilterBuilder(), pool,
order_by_column, current_max_value_row_id, asc, null_first, is_close_interval);
if (rf == nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/chunks_sorter_topn.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ChunksSorterTopn : public ChunksSorter {

void setup_runtime(RuntimeState* state, RuntimeProfile* profile, MemTracker* parent_mem_tracker) override;

std::vector<JoinRuntimeFilter*>* runtime_filters(ObjectPool* pool) override;
std::vector<RuntimeFilter*>* runtime_filters(ObjectPool* pool) override;

private:
size_t _get_number_of_rows_to_sort() const { return _offset + _limit; }
Expand Down Expand Up @@ -143,7 +143,7 @@ class ChunksSorterTopn : public ChunksSorter {
const size_t _offset;
const TTopNType::type _topn_type;

std::vector<JoinRuntimeFilter*> _runtime_filter;
std::vector<RuntimeFilter*> _runtime_filter;

RuntimeProfile::Counter* _sort_filter_rows = nullptr;
RuntimeProfile::Counter* _sort_filter_timer = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ Status HashJoinNode::_do_publish_runtime_filters(RuntimeState* state, int64_t li
// skip if ht.size() > limit and it's only for local.
if (!rf_desc->has_remote_targets() && _ht.get_row_count() > limit) continue;
LogicalType build_type = rf_desc->build_expr_type();
JoinRuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
RuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
if (filter == nullptr) continue;
filter->set_join_mode(rf_desc->join_mode());
filter->init(_ht.get_row_count());
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,12 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li
columns.push_back(column);
}

MutableJoinRuntimeFilterPtr filter = nullptr;
MutableRuntimeFilterPtr filter = nullptr;
auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned();
multi_partitioned |= rf_desc->num_colocate_partition() > 0;
if (multi_partitioned) {
LogicalType build_type = rf_desc->build_expr_type();
filter = std::shared_ptr<JoinRuntimeFilter>(
filter = std::shared_ptr<RuntimeFilter>(
RuntimeFilterHelper::create_runtime_bloom_filter(nullptr, build_type));
if (filter == nullptr) {
_runtime_bloom_filter_build_params.emplace_back();
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ Status ChunkPredicateBuilder<E, Type>::normalize_binary_predicate(const SlotDesc

template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const JoinRuntimeFilter* rf,
const SlotDescriptor* slot_desc, Args&&... args) {
void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const RuntimeFilter* rf, const SlotDescriptor* slot_desc,
Args&&... args) {
DCHECK(Type == CompoundNodeType::AND);
using RFColumnPredicateBuilder = detail::RuntimeColumnPredicateBuilder;
ObjectPool* pool = _opts.obj_pool;
Expand Down Expand Up @@ -686,7 +686,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
// bloom runtime filter
for (const auto& it : _opts.runtime_filters->descriptors()) {
RuntimeFilterProbeDescriptor* desc = it.second;
const JoinRuntimeFilter* rf = desc->runtime_filter(_opts.driver_sequence);
const RuntimeFilter* rf = desc->runtime_filter(_opts.driver_sequence);
using RangeType = ColumnValueRange<RangeValueType>;
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;
SlotId slot_id;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ChunkPredicateBuilder {
const UnarrivedRuntimeFilterList& unarrived_runtime_filters() { return rt_ranger_params; }

template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void normalized_rf_with_null(const JoinRuntimeFilter* rf, const SlotDescriptor* slot_desc, Args&&... args);
void normalized_rf_with_null(const RuntimeFilter* rf, const SlotDescriptor* slot_desc, Args&&... args);

private:
const ScanConjunctsManagerOptions& _opts;
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/pipeline/runtime_filter_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using RuntimeInFilter = starrocks::ExprContext;
using RuntimeBloomFilter = starrocks::RuntimeFilterBuildDescriptor;
using RuntimeBloomFilterProbeDescriptor = starrocks::RuntimeFilterProbeDescriptor;
using RuntimeBloomFilterProbeDescriptorPtr = RuntimeBloomFilterProbeDescriptor*;
using RuntimeBloomFilterRunningContext = starrocks::JoinRuntimeFilter::RunningContext;
using RuntimeBloomFilterRunningContext = RuntimeFilter::RunningContext;
using RuntimeInFilterPtr = RuntimeInFilter*;
using RuntimeBloomFilterPtr = RuntimeBloomFilter*;
using RuntimeInFilters = std::vector<RuntimeInFilterPtr>;
Expand All @@ -50,7 +50,7 @@ using OptRuntimeBloomFilterBuildParams = std::vector<std::optional<RuntimeBloomF
// Parameters used to build runtime bloom-filters.
struct RuntimeBloomFilterBuildParam {
RuntimeBloomFilterBuildParam(bool multi_partitioned, bool eq_null, bool is_empty, std::vector<ColumnPtr> columns,
MutableJoinRuntimeFilterPtr runtime_filter)
MutableRuntimeFilterPtr runtime_filter)
: multi_partitioned(multi_partitioned),
eq_null(eq_null),
is_empty(is_empty),
Expand All @@ -60,7 +60,7 @@ struct RuntimeBloomFilterBuildParam {
bool eq_null;
bool is_empty;
std::vector<ColumnPtr> columns;
MutableJoinRuntimeFilterPtr runtime_filter;
MutableRuntimeFilterPtr runtime_filter;
};

// RuntimeFilterCollector contains runtime in-filters and bloom-filters, it is stored in RuntimeFilerHub
Expand Down Expand Up @@ -402,7 +402,7 @@ class PartialRuntimeFilterMerger {
// skip if ht.size() > limit, and it's only for local.
if (!desc->has_remote_targets() && row_count > _local_rf_limit) continue;
LogicalType build_type = desc->build_expr_type();
JoinRuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
RuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
if (filter == nullptr) continue;

if (desc->has_remote_targets() && row_count > _global_rf_limit) {
Expand Down Expand Up @@ -479,7 +479,7 @@ class PartialRuntimeFilterMerger {
// skip if ht.size() > limit, and it's only for local.
if (!desc->has_remote_targets() && row_count > _local_rf_limit) continue;
LogicalType build_type = desc->build_expr_type();
JoinRuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
RuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
if (filter == nullptr) continue;
if (desc->has_remote_targets() && row_count > _global_rf_limit) {
filter->clear_bf();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/min_max_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class MinMaxPredicate : public Expr {

class MinMaxPredicateBuilder {
public:
MinMaxPredicateBuilder(ObjectPool* pool, SlotId slot_id, const JoinRuntimeFilter* filter)
MinMaxPredicateBuilder(ObjectPool* pool, SlotId slot_id, const RuntimeFilter* filter)
: _pool(pool), _slot_id(slot_id), _filter(filter) {}

template <LogicalType ltype>
Expand All @@ -147,7 +147,7 @@ class MinMaxPredicateBuilder {
private:
ObjectPool* _pool;
SlotId _slot_id;
const JoinRuntimeFilter* _filter;
const RuntimeFilter* _filter;
};

} // namespace starrocks
10 changes: 5 additions & 5 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void SimdBlockFilter::clear() {
}
}

size_t JoinRuntimeFilter::max_serialized_size() const {
size_t RuntimeFilter::max_serialized_size() const {
// todo(yan): noted that it's not serialize compatible with 32-bit and 64-bit.
auto num_partitions = _hash_partition_bf.size();
size_t size = sizeof(_has_null) + sizeof(_size) + sizeof(num_partitions) + sizeof(_join_mode);
Expand All @@ -145,7 +145,7 @@ size_t JoinRuntimeFilter::max_serialized_size() const {
return size;
}

size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const {
size_t RuntimeFilter::serialize(int serialize_version, uint8_t* data) const {
size_t offset = 0;
auto num_partitions = _hash_partition_bf.size();
#define JRF_COPY_FIELD(field) \
Expand All @@ -168,7 +168,7 @@ size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const
return offset;
}

size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data) {
size_t RuntimeFilter::deserialize(int serialize_version, const uint8_t* data) {
size_t offset = 0;
size_t num_partitions = 0;
#define JRF_COPY_FIELD(field) \
Expand All @@ -193,7 +193,7 @@ size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data
return offset;
}

bool JoinRuntimeFilter::check_equal(const JoinRuntimeFilter& rf) const {
bool RuntimeFilter::check_equal(const RuntimeFilter& rf) const {
auto lhs_num_partitions = _hash_partition_bf.size();
auto rhs_num_partitions = rf._hash_partition_bf.size();
bool first = (_has_null == rf._has_null && _size == rf._size && lhs_num_partitions == rhs_num_partitions &&
Expand All @@ -211,7 +211,7 @@ bool JoinRuntimeFilter::check_equal(const JoinRuntimeFilter& rf) const {
return true;
}

void JoinRuntimeFilter::clear_bf() {
void RuntimeFilter::clear_bf() {
if (_hash_partition_bf.empty()) {
_bf.clear();
} else {
Expand Down
Loading

0 comments on commit df1cb6a

Please sign in to comment.