Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](executor) Support task group schedule in pipeline engine #17615

Merged
merged 23 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
unify query ctx creation
  • Loading branch information
liutang123 committed Mar 29, 2023
commit 887356699f3860c1a41b12170f74b64cdb5e74fe
151 changes: 41 additions & 110 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,35 +584,13 @@ void FragmentMgr::remove_pipeline_context(
}
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
const FinishCallback& cb) {
auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer")
: telemetry::get_noop_tracer();
VLOG_ROW << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "query options is "
<< apache::thrift::ThriftDebugString(params.query_options).c_str();
START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
// Duplicated
return Status::OK();
}
}

std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine;
template <typename Params>
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
std::shared_ptr<QueryFragmentsCtx>& fragments_ctx) {
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
auto search = _fragments_ctx_map.find(query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
Expand All @@ -624,7 +602,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
fragments_ctx->query_id = params.params.query_id;
fragments_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
Expand All @@ -641,7 +619,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
}

fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
pipeline_engine_enabled);
pipeline);
fragments_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, fragments_ctx.get());

Expand Down Expand Up @@ -673,7 +651,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
fragments_ctx->query_mem_tracker->enable_print_log_usage();
}

if (params.query_options.enable_pipeline_engine) {
if (pipeline) {
int ts = fragments_ctx->timeout_second;
taskgroup::TaskGroupPtr tg;
auto ts_id = taskgroup::TaskGroupManager::DEFAULT_TG_ID;
Expand All @@ -690,7 +668,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
auto search = _fragments_ctx_map.find(query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
Expand All @@ -702,6 +680,36 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
}
}
}
return Status::OK();
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
const FinishCallback& cb) {
auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer")
: telemetry::get_noop_tracer();
VLOG_ROW << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "query options is "
<< apache::thrift::ThriftDebugString(params.query_options).c_str();
START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
// Duplicated
return Status::OK();
}
}

std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine;
RETURN_IF_ERROR(
_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, fragments_ctx));
fragments_ctx->fragment_ids.push_back(fragment_instance_id);

exec_state.reset(
Expand Down Expand Up @@ -801,93 +809,16 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,

std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
fragments_ctx->query_id = params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: "
<< UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo)
<< " coord_addr " << fragments_ctx->coord_addr
<< " total fragment num on current host: " << params.fragment_num_on_host;
fragments_ctx->query_globals = params.query_globals;

if (params.__isset.resource_info) {
fragments_ctx->user = params.resource_info.user;
fragments_ctx->group = params.resource_info.group;
fragments_ctx->set_rsc_info = true;
}

fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true);
fragments_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, fragments_ctx.get());

bool has_query_mem_tracker =
params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = MemInfo::mem_limit();
}
if (params.query_options.query_type == TQueryType::SELECT) {
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
} else { // EXTERNAL
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
fragments_ctx->query_mem_tracker->enable_print_log_usage();
}
{
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(fragments_ctx->query_id)
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragments context, use it
fragments_ctx = search->second;
}
}
}
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, fragments_ctx));

for (size_t i = 0; i < params.local_params.size(); i++) {
const auto& local_params = params.local_params[i];

const TUniqueId& fragment_instance_id = local_params.fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
auto iter = _pipeline_map.find(fragment_instance_id);
if (iter != _pipeline_map.end()) {
// Duplicated
continue;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class FragmentMgr : public RestMonitorIface {
RuntimeState* state,
QueryFragmentsCtx* fragments_ctx);

template <typename Params>
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
std::shared_ptr<QueryFragmentsCtx>& fragments_ctx);

// This is input params
ExecEnv* _exec_env;

Expand Down