Skip to content

Commit

Permalink
[refactor](runtimefilter) do not use QueryContext in runtime filter (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Dec 20, 2023
1 parent 4c0080e commit c26c0c3
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 118 deletions.
47 changes: 7 additions & 40 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,6 @@ class RuntimePredicateWrapper {
_filter_type(type),
_filter_id(filter_id) {}

RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
const RuntimeFilterParams* params)
: _query_ctx(query_ctx),
_be_exec_version(_query_ctx->be_exec_version()),
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
_filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type,
RuntimeFilterType type, uint32_t filter_id)
: _query_ctx(query_ctx),
_be_exec_version(_query_ctx->be_exec_version()),
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
_filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
Status init(const RuntimeFilterParams* params) {
Expand Down Expand Up @@ -946,7 +928,6 @@ class RuntimePredicateWrapper {

private:
RuntimeFilterParamsContext* _state;
QueryContext* _query_ctx;
int _be_exec_version;
ObjectPool* _pool;

Expand All @@ -971,15 +952,6 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}

Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(query_ctx, pool, desc));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}

void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
context = _wrapper->_context;
}
Expand Down Expand Up @@ -1036,10 +1008,8 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr

bool IRuntimeFilter::await() {
DCHECK(is_consumer());
auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
: _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
: _state->runtime_filter_wait_time_ms;
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
Expand Down Expand Up @@ -1215,11 +1185,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
_probe_expr = iter->second;
}

if (_state) {
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
} else {
_wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, &params));
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));

return _wrapper->init(&params);
}

Expand Down Expand Up @@ -1247,15 +1214,15 @@ Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
return _create_wrapper(state, param, pool, wrapper);
}

Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx,
Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, get_type(filter_type),
wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type),
param->request->filter_id()));

switch (filter_type) {
Expand Down Expand Up @@ -1683,7 +1650,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
}

std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, &tmp_wrapper));
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &tmp_wrapper));
auto origin_type = _wrapper->get_real_type();
RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
if (origin_type != _wrapper->get_real_type()) {
Expand Down
33 changes: 3 additions & 30 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,39 +208,13 @@ class IRuntimeFilter {
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}

IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc)
: _query_ctx(query_ctx),
_pool(pool),
_filter_id(desc->filter_id),
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
_wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
_rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false);

static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, IRuntimeFilter** res, bool build_bf_exactly = false);

void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);

Expand Down Expand Up @@ -307,8 +281,8 @@ class IRuntimeFilter {
static Status create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParams* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param,
ObjectPool* pool,
static Status create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
void change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Expand Down Expand Up @@ -370,7 +344,7 @@ class IRuntimeFilter {
int32_t wait_time_ms() const {
int32_t res = 0;
if (wait_infinitely()) {
res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout;
res = _state->execution_timeout;
// Convert to ms
res *= 1000;
} else {
Expand Down Expand Up @@ -424,7 +398,6 @@ class IRuntimeFilter {
}

RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
ObjectPool* _pool = nullptr;
// _wrapper is a runtime filter function wrapper
// _wrapper should alloc from _pool
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
pquery_id->set_lo(_state->query_id.lo());

auto pfragment_instance_id = _rpc_context->request.mutable_fragment_instance_id();
pfragment_instance_id->set_hi(state->fragment_instance_id.hi());
pfragment_instance_id->set_lo(state->fragment_instance_id.lo());
pfragment_instance_id->set_hi(state->fragment_instance_id().hi());
pfragment_instance_id->set_lo(state->fragment_instance_id().lo());

_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
filterparams->query_id.set_hi(_runtime_state->query_id().hi);
filterparams->query_id.set_lo(_runtime_state->query_id().lo);

filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi);
filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo);
filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi);
filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo);
filterparams->be_exec_version = _runtime_state->be_exec_version();
filterparams->query_ctx = _query_ctx.get();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
_query_options(query_options) {
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this));
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));
}

QueryContext::~QueryContext() {
Expand Down
44 changes: 39 additions & 5 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams
_state->runtime_filter_mgr = this;
}

RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
: _query_ctx(query_ctx) {}

Status RuntimeFilterMgr::init() {
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
ExecEnv::GetInstance()->experimental_mem_tracker());
Expand Down Expand Up @@ -120,7 +117,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets &&
desc.type == TRuntimeFilterType::BLOOM) {
// if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
DCHECK(_query_ctx != nullptr);

iter = _consumer_map.find(key);
if (iter != _consumer_map.end()) {
Expand All @@ -131,7 +127,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
}
}
IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options,
RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options,
RuntimeFilterRole::CONSUMER, node_id, &filter,
build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
Expand Down Expand Up @@ -569,4 +565,42 @@ void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
delete entity;
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) {
RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = state->enable_pipeline_exec();
params->execution_timeout = state->execution_timeout();
params->runtime_filter_mgr = state->runtime_filter_mgr();
params->exec_env = state->exec_env();
params->query_id.set_hi(state->query_id().hi);
params->query_id.set_lo(state->query_id().lo);

params->_fragment_instance_id.set_hi(state->fragment_instance_id().hi);
params->_fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = state->be_exec_version();
params->query_ctx = state->get_query_ctx();
return params;
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* query_ctx) {
RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = query_ctx->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = query_ctx->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = query_ctx->enable_pipeline_exec();
params->execution_timeout = query_ctx->execution_timeout();
params->runtime_filter_mgr = query_ctx->runtime_filter_mgr();
params->exec_env = query_ctx->exec_env();
params->query_id.set_hi(query_ctx->query_id().hi);
params->query_id.set_lo(query_ctx->query_id().lo);

// params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
// params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = query_ctx->be_exec_version();
params->query_ctx = query_ctx;
params->_obj_pool = &query_ctx->obj_pool;
params->_is_global = true;
return params;
}

} // namespace doris
38 changes: 35 additions & 3 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum class RuntimeFilterRole;
class RuntimePredicateWrapper;
class QueryContext;
struct RuntimeFilterParamsContext;
class ExecEnv;

/// producer:
/// Filter filter;
Expand All @@ -68,8 +69,6 @@ class RuntimeFilterMgr {
public:
RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state);

RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);

~RuntimeFilterMgr() = default;

Status init();
Expand Down Expand Up @@ -109,7 +108,6 @@ class RuntimeFilterMgr {
std::map<int32_t, IRuntimeFilter*> _producer_map;

RuntimeFilterParamsContext* _state = nullptr;
QueryContext* _query_ctx = nullptr;
std::unique_ptr<MemTracker> _tracker;
ObjectPool _pool;

Expand Down Expand Up @@ -227,4 +225,38 @@ using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMerge
void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
RuntimeFilterMergeControllerEntity* entity);

//There are two types of runtime filters:
// one is global, originating from QueryContext,
// and the other is local, originating from RuntimeState.
// In practice, we have already distinguished between them through UpdateRuntimeFilterParamsV2/V1.
// RuntimeState/QueryContext is only used to store runtime_filter_wait_time_ms and enable_pipeline_exec...

/// TODO: Consider adding checks for global/local.
struct RuntimeFilterParamsContext {
RuntimeFilterParamsContext() = default;
static RuntimeFilterParamsContext* create(RuntimeState* state);
static RuntimeFilterParamsContext* create(QueryContext* query_ctx);

bool runtime_filter_wait_infinitely;
int32_t runtime_filter_wait_time_ms;
bool enable_pipeline_exec;
int32_t execution_timeout;
RuntimeFilterMgr* runtime_filter_mgr;
ExecEnv* exec_env;
PUniqueId query_id;
PUniqueId _fragment_instance_id;
int be_exec_version;
QueryContext* query_ctx;
QueryContext* get_query_ctx() const { return query_ctx; }
ObjectPool* _obj_pool;
bool _is_global = false;
PUniqueId fragment_instance_id() const {
DCHECK(!_is_global);
return _fragment_instance_id;
}
ObjectPool* obj_pool() const {
DCHECK(_is_global);
return _obj_pool;
}
};
} // namespace doris
17 changes: 0 additions & 17 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,4 @@ bool RuntimeState::enable_page_cache() const {
(_query_options.__isset.enable_page_cache && _query_options.enable_page_cache);
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) {
RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext());
params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely();
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = state->enable_pipeline_exec();
params->execution_timeout = state->execution_timeout();
params->runtime_filter_mgr = state->runtime_filter_mgr();
params->exec_env = state->exec_env();
params->query_id.set_hi(state->query_id().hi);
params->query_id.set_lo(state->query_id().lo);

params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = state->be_exec_version();
params->query_ctx = state->get_query_ctx();
return params;
}
} // end namespace doris
18 changes: 0 additions & 18 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -662,24 +662,6 @@ class RuntimeState {
RuntimeState(const RuntimeState&);
};

// from runtime state
struct RuntimeFilterParamsContext {
RuntimeFilterParamsContext() = default;
static RuntimeFilterParamsContext* create(RuntimeState* state);

bool runtime_filter_wait_infinitely;
int32_t runtime_filter_wait_time_ms;
bool enable_pipeline_exec;
int32_t execution_timeout;
RuntimeFilterMgr* runtime_filter_mgr;
ExecEnv* exec_env;
PUniqueId query_id;
PUniqueId fragment_instance_id;
int be_exec_version;
QueryContext* query_ctx;
QueryContext* get_query_ctx() const { return query_ctx; }
};

#define RETURN_IF_CANCELLED(state) \
do { \
if (UNLIKELY((state)->is_cancelled())) return Status::Cancelled("Cancelled"); \
Expand Down

0 comments on commit c26c0c3

Please sign in to comment.