diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index efdb8af7029664..7221ba1a972f7c 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -298,24 +298,6 @@ class RuntimePredicateWrapper { _filter_type(type), _filter_id(filter_id) {} - RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, - const RuntimeFilterParams* params) - : _query_ctx(query_ctx), - _be_exec_version(_query_ctx->be_exec_version()), - _pool(pool), - _column_return_type(params->column_return_type), - _filter_type(params->filter_type), - _filter_id(params->filter_id) {} - // for a 'tmp' runtime predicate wrapper - // only could called assign method or as a param for merge - RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type, - RuntimeFilterType type, uint32_t filter_id) - : _query_ctx(query_ctx), - _be_exec_version(_query_ctx->be_exec_version()), - _pool(pool), - _column_return_type(column_type), - _filter_type(type), - _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { @@ -946,7 +928,6 @@ class RuntimePredicateWrapper { private: RuntimeFilterParamsContext* _state; - QueryContext* _query_ctx; int _be_exec_version; ObjectPool* _pool; @@ -971,15 +952,6 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); } -Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool, - const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, - const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, - bool build_bf_exactly) { - *res = pool->add(new IRuntimeFilter(query_ctx, pool, desc)); - (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); -} - void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { context = _wrapper->_context; } @@ -1036,10 +1008,8 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listexecution_timeout() * 1000 - : _state->execution_timeout * 1000; - auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms() - : _state->runtime_filter_wait_time_ms; + auto execution_timeout = _state->execution_timeout * 1000; + auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms; // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER ? execution_timeout @@ -1215,11 +1185,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue _probe_expr = iter->second; } - if (_state) { - _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); - } else { - _wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, ¶ms)); - } + _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); + return _wrapper->init(¶ms); } @@ -1247,7 +1214,7 @@ Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, return _create_wrapper(state, param, pool, wrapper); } -Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx, +Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, std::unique_ptr* wrapper) { int filter_type = param->request->filter_type(); @@ -1255,7 +1222,7 @@ Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx, if (param->request->has_in_filter()) { column_type = to_primitive_type(param->request->in_filter().column_type()); } - wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, get_type(filter_type), + wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type), param->request->filter_id())); switch (filter_type) { @@ -1683,7 +1650,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, } std::unique_ptr tmp_wrapper; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, &tmp_wrapper)); + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &tmp_wrapper)); auto origin_type = _wrapper->get_real_type(); RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get())); if (origin_type != _wrapper->get_real_type()) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index cc47d590e6b50b..17660810520f11 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -208,28 +208,6 @@ class IRuntimeFilter { to_string(_runtime_filter_type))), _profile(new RuntimeProfile(_name)) {} - IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc) - : _query_ctx(query_ctx), - _pool(pool), - _filter_id(desc->filter_id), - _is_broadcast_join(true), - _has_remote_target(false), - _has_local_target(false), - _rf_state(RuntimeFilterState::NOT_READY), - _rf_state_atomic(RuntimeFilterState::NOT_READY), - _role(RuntimeFilterRole::PRODUCER), - _expr_order(-1), - _always_true(false), - _is_ignored(false), - registration_time_(MonotonicMillis()), - _wait_infinitely(query_ctx->runtime_filter_wait_infinitely()), - _rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()), - _enable_pipeline_exec(query_ctx->enable_pipeline_exec()), - _runtime_filter_type(get_runtime_filter_type(desc)), - _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - to_string(_runtime_filter_type))), - _profile(new RuntimeProfile(_name)) {} - ~IRuntimeFilter() = default; static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool, @@ -237,10 +215,6 @@ class IRuntimeFilter { const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); - static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc, - const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); - void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); @@ -307,8 +281,8 @@ class IRuntimeFilter { static Status create_wrapper(RuntimeFilterParamsContext* state, const UpdateRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper); - static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param, - ObjectPool* pool, + static Status create_wrapper(RuntimeFilterParamsContext* state, + const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, std::unique_ptr* wrapper); void change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); @@ -370,7 +344,7 @@ class IRuntimeFilter { int32_t wait_time_ms() const { int32_t res = 0; if (wait_infinitely()) { - res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout; + res = _state->execution_timeout; // Convert to ms res *= 1000; } else { @@ -424,7 +398,6 @@ class IRuntimeFilter { } RuntimeFilterParamsContext* _state = nullptr; - QueryContext* _query_ctx = nullptr; ObjectPool* _pool = nullptr; // _wrapper is a runtime filter function wrapper // _wrapper should alloc from _pool diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index a9aa7944625736..4ca4a78247c488 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -68,8 +68,8 @@ Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state, pquery_id->set_lo(_state->query_id.lo()); auto pfragment_instance_id = _rpc_context->request.mutable_fragment_instance_id(); - pfragment_instance_id->set_hi(state->fragment_instance_id.hi()); - pfragment_instance_id->set_lo(state->fragment_instance_id.lo()); + pfragment_instance_id->set_hi(state->fragment_instance_id().hi()); + pfragment_instance_id->set_lo(state->fragment_instance_id().lo()); _rpc_context->request.set_filter_id(_filter_id); _rpc_context->request.set_opt_remote_rf(opt_remote_rf); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 0ac4bc2bd872fb..31ba7b0b882090 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -520,8 +520,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( filterparams->query_id.set_hi(_runtime_state->query_id().hi); filterparams->query_id.set_lo(_runtime_state->query_id().lo); - filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi); - filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo); + filterparams->_fragment_instance_id.set_hi(fragment_instance_id.hi); + filterparams->_fragment_instance_id.set_lo(fragment_instance_id.lo); filterparams->be_exec_version = _runtime_state->be_exec_version(); filterparams->query_ctx = _query_ctx.get(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index a9886da5cd4c0f..1f54a0dcde49c6 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -36,12 +36,13 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* timeout_second(-1), _query_id(query_id), _exec_env(exec_env), - _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), _query_options(query_options) { _start_time = VecDateTimeValue::local_time(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _shared_scanner_controller.reset(new vectorized::SharedScannerController()); _execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this)); + _runtime_filter_mgr.reset( + new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this))); } QueryContext::~QueryContext() { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index c1ebf2103f8704..879b225896c181 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -56,9 +56,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams _state->runtime_filter_mgr = this; } -RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx) - : _query_ctx(query_ctx) {} - Status RuntimeFilterMgr::init() { _tracker = std::make_unique("RuntimeFilterMgr", ExecEnv::GetInstance()->experimental_mem_tracker()); @@ -120,7 +117,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) { // if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances - DCHECK(_query_ctx != nullptr); iter = _consumer_map.find(key); if (iter != _consumer_map.end()) { @@ -131,7 +127,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } } IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options, + RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), &desc, &options, RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); @@ -569,4 +565,42 @@ void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller, delete entity; } +RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) { + RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext()); + params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely(); + params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms(); + params->enable_pipeline_exec = state->enable_pipeline_exec(); + params->execution_timeout = state->execution_timeout(); + params->runtime_filter_mgr = state->runtime_filter_mgr(); + params->exec_env = state->exec_env(); + params->query_id.set_hi(state->query_id().hi); + params->query_id.set_lo(state->query_id().lo); + + params->_fragment_instance_id.set_hi(state->fragment_instance_id().hi); + params->_fragment_instance_id.set_lo(state->fragment_instance_id().lo); + params->be_exec_version = state->be_exec_version(); + params->query_ctx = state->get_query_ctx(); + return params; +} + +RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* query_ctx) { + RuntimeFilterParamsContext* params = query_ctx->obj_pool.add(new RuntimeFilterParamsContext()); + params->runtime_filter_wait_infinitely = query_ctx->runtime_filter_wait_infinitely(); + params->runtime_filter_wait_time_ms = query_ctx->runtime_filter_wait_time_ms(); + params->enable_pipeline_exec = query_ctx->enable_pipeline_exec(); + params->execution_timeout = query_ctx->execution_timeout(); + params->runtime_filter_mgr = query_ctx->runtime_filter_mgr(); + params->exec_env = query_ctx->exec_env(); + params->query_id.set_hi(query_ctx->query_id().hi); + params->query_id.set_lo(query_ctx->query_id().lo); + + // params->fragment_instance_id.set_hi(state->fragment_instance_id().hi); + // params->fragment_instance_id.set_lo(state->fragment_instance_id().lo); + params->be_exec_version = query_ctx->be_exec_version(); + params->query_ctx = query_ctx; + params->_obj_pool = &query_ctx->obj_pool; + params->_is_global = true; + return params; +} + } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 2541299b5d8fdb..e0340216b56191 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -51,6 +51,7 @@ enum class RuntimeFilterRole; class RuntimePredicateWrapper; class QueryContext; struct RuntimeFilterParamsContext; +class ExecEnv; /// producer: /// Filter filter; @@ -68,8 +69,6 @@ class RuntimeFilterMgr { public: RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state); - RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx); - ~RuntimeFilterMgr() = default; Status init(); @@ -109,7 +108,6 @@ class RuntimeFilterMgr { std::map _producer_map; RuntimeFilterParamsContext* _state = nullptr; - QueryContext* _query_ctx = nullptr; std::unique_ptr _tracker; ObjectPool _pool; @@ -227,4 +225,38 @@ using runtime_filter_merge_entity_closer = std::functionobj_pool()->add(new RuntimeFilterParamsContext()); - params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely(); - params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms(); - params->enable_pipeline_exec = state->enable_pipeline_exec(); - params->execution_timeout = state->execution_timeout(); - params->runtime_filter_mgr = state->runtime_filter_mgr(); - params->exec_env = state->exec_env(); - params->query_id.set_hi(state->query_id().hi); - params->query_id.set_lo(state->query_id().lo); - - params->fragment_instance_id.set_hi(state->fragment_instance_id().hi); - params->fragment_instance_id.set_lo(state->fragment_instance_id().lo); - params->be_exec_version = state->be_exec_version(); - params->query_ctx = state->get_query_ctx(); - return params; -} } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cec230dc3423ab..a4524d846fb376 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -662,24 +662,6 @@ class RuntimeState { RuntimeState(const RuntimeState&); }; -// from runtime state -struct RuntimeFilterParamsContext { - RuntimeFilterParamsContext() = default; - static RuntimeFilterParamsContext* create(RuntimeState* state); - - bool runtime_filter_wait_infinitely; - int32_t runtime_filter_wait_time_ms; - bool enable_pipeline_exec; - int32_t execution_timeout; - RuntimeFilterMgr* runtime_filter_mgr; - ExecEnv* exec_env; - PUniqueId query_id; - PUniqueId fragment_instance_id; - int be_exec_version; - QueryContext* query_ctx; - QueryContext* get_query_ctx() const { return query_ctx; } -}; - #define RETURN_IF_CANCELLED(state) \ do { \ if (UNLIKELY((state)->is_cancelled())) return Status::Cancelled("Cancelled"); \