Skip to content

Commit

Permalink
[Enhancement] Avoid call QueryCtx destructor when hold mutex (#16602)
Browse files Browse the repository at this point in the history
(cherry picked from commit 15c6e64)

# Conflicts:
#	be/src/exec/pipeline/fragment_executor.cpp
#	be/src/exec/pipeline/query_context.cpp
  • Loading branch information
stdpain authored and mergify[bot] committed Jan 28, 2023
1 parent e0cedb2 commit 76a354d
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 22 deletions.
60 changes: 55 additions & 5 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "runtime/result_sink.h"
#include "util/debug/query_trace.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -484,27 +485,76 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
UnifiedExecPlanFragmentParams request(common_request, unique_request);

bool prepare_success = false;
int64_t prepare_time = 0;
DeferOp defer([this, &request, &prepare_success, &prepare_time]() {
struct {
int64_t prepare_time = 0;
int64_t prepare_query_ctx_time = 0;
int64_t prepare_fragment_ctx_time = 0;
int64_t prepare_runtime_state_time = 0;
int64_t prepare_pipeline_driver_time = 0;
} profiler;

DeferOp defer([this, &request, &prepare_success, &profiler]() {
if (prepare_success) {
auto fragment_ctx = _query_ctx->fragment_mgr()->get(request.fragment_instance_id());
auto* prepare_timer =
ADD_TIMER(fragment_ctx->runtime_state()->runtime_profile(), "FragmentInstancePrepareTime");
COUNTER_SET(prepare_timer, prepare_time);
COUNTER_SET(prepare_timer, profiler.prepare_time);
auto* prepare_query_ctx_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-query-ctx",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_query_ctx_timer, profiler.prepare_query_ctx_time);

auto* prepare_fragment_ctx_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-fragment-ctx",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_fragment_ctx_timer, profiler.prepare_fragment_ctx_time);

auto* prepare_runtime_state_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-runtime-state",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_runtime_state_timer, profiler.prepare_runtime_state_time);

auto* prepare_pipeline_driver_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(),
"prepare-pipeline-driver", "FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_pipeline_driver_timer, profiler.prepare_runtime_state_time);
} else {
_fail_cleanup();
}
});
SCOPED_RAW_TIMER(&prepare_time);
RETURN_IF_ERROR(exec_env->query_pool_mem_tracker()->check_mem_limit("Start execute plan fragment."));

<<<<<<< HEAD
RETURN_IF_ERROR(_prepare_query_ctx(exec_env, request));
RETURN_IF_ERROR(_prepare_fragment_ctx(request));
RETURN_IF_ERROR(_prepare_workgroup(request));
RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
RETURN_IF_ERROR(_prepare_global_dict(request));
RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
=======
SCOPED_RAW_TIMER(&profiler.prepare_time);
RETURN_IF_ERROR(exec_env->query_pool_mem_tracker()->check_mem_limit("Start execute plan fragment."));
{
SCOPED_RAW_TIMER(&profiler.prepare_query_ctx_time);
RETURN_IF_ERROR(_prepare_query_ctx(exec_env, request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_fragment_ctx_time);
RETURN_IF_ERROR(_prepare_fragment_ctx(request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_runtime_state_time);
RETURN_IF_ERROR(_prepare_workgroup(request));
RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
RETURN_IF_ERROR(_prepare_global_dict(request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_pipeline_driver_time);
RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));
}
>>>>>>> 15c6e64da ([Enhancement] Avoid call QueryCtx destructor when hold mutex (#16602))

_query_ctx->fragment_mgr()->register_ctx(request.fragment_instance_id(), _fragment_ctx);
prepare_success = true;
Expand Down
229 changes: 226 additions & 3 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#include "exec/pipeline/query_context.h"

#include <memory>
#include <vector>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/workgroup/work_group.h"
#include "runtime/current_thread.h"
#include "runtime/data_stream_mgr.h"
Expand Down Expand Up @@ -158,11 +160,12 @@ Status QueryContextManager::init() {
return Status::InternalError("Fail to create clean_thread of QueryContextManager");
}
}
void QueryContextManager::_clean_slot_unlocked(size_t i) {
void QueryContextManager::_clean_slot_unlocked(size_t i, std::vector<QueryContextPtr>& del) {
auto& sc_map = _second_chance_maps[i];
auto sc_it = sc_map.begin();
while (sc_it != sc_map.end()) {
if (sc_it->second->has_no_active_instances() && sc_it->second->is_delivery_expired()) {
del.emplace_back(std::move(sc_it->second));
sc_it = sc_map.erase(sc_it);
} else {
++sc_it;
Expand All @@ -172,8 +175,9 @@ void QueryContextManager::_clean_slot_unlocked(size_t i) {
void QueryContextManager::_clean_query_contexts() {
for (auto i = 0; i < _num_slots; ++i) {
auto& mutex = _mutexes[i];
std::vector<QueryContextPtr> del_list;
std::unique_lock write_lock(mutex);
_clean_slot_unlocked(i);
_clean_slot_unlocked(i, del_list);
}
}

Expand Down Expand Up @@ -283,8 +287,13 @@ bool QueryContextManager::remove(const TUniqueId& query_id) {
auto& context_map = _context_maps[i];
auto& sc_map = _second_chance_maps[i];

// retain the query_ctx reference to avoid call destructors while holding a lock
// we should define them before hold the write lock
QueryContextPtr query_ctx;
std::vector<QueryContextPtr> del_list;

std::unique_lock<std::shared_mutex> write_lock(mutex);
_clean_slot_unlocked(i);
_clean_slot_unlocked(i, del_list);
// return directly if query_ctx is absent
auto it = context_map.find(query_id);
if (it == context_map.end()) {
Expand All @@ -293,6 +302,7 @@ bool QueryContextManager::remove(const TUniqueId& query_id) {

// the query context is really dead, so just cleanup
if (it->second->is_dead()) {
query_ctx = std::move(it->second);
context_map.erase(it);
return true;
} else if (it->second->has_no_active_instances()) {
Expand All @@ -318,4 +328,217 @@ void QueryContextManager::clear() {
_context_maps.clear();
}

<<<<<<< HEAD
=======
void QueryContextManager::report_fragments_with_same_host(
const std::vector<std::shared_ptr<FragmentContext>>& need_report_fragment_context, std::vector<bool>& reported,
const TNetworkAddress& last_coord_addr, std::vector<TReportExecStatusParams>& report_exec_status_params_vector,
std::vector<int32_t>& cur_batch_report_indexes) {
for (int i = 0; i < need_report_fragment_context.size(); i++) {
if (reported[i] == false) {
FragmentContext* fragment_ctx = need_report_fragment_context[i].get();

if (fragment_ctx->all_pipelines_finished()) {
reported[i] = true;
continue;
}

Status fragment_ctx_status = fragment_ctx->final_status();
if (!fragment_ctx_status.ok()) {
reported[i] = true;
starrocks::ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(
fragment_ctx->query_id(), fragment_ctx->fragment_instance_id());
continue;
}

Status fe_connection_status;
auto fe_addr = fragment_ctx->fe_addr();
auto fragment_id = fragment_ctx->fragment_instance_id();
auto* runtime_state = fragment_ctx->runtime_state();
DCHECK(runtime_state != nullptr);

if (fe_addr == last_coord_addr) {
TReportExecStatusParams params;

params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(fragment_ctx->query_id());
params.__set_backend_num(runtime_state->be_number());
params.__set_fragment_instance_id(fragment_id);
fragment_ctx_status.set_t_status(&params);
params.__set_done(false);

if (runtime_state->query_options().query_type == TQueryType::LOAD) {
runtime_state->update_report_load_status(&params);
params.__set_load_type(runtime_state->query_options().load_job_type);
}

auto backend_id = get_backend_id();
if (backend_id.has_value()) {
params.__set_backend_id(backend_id.value());
}

report_exec_status_params_vector.emplace_back(std::move(params));
cur_batch_report_indexes.push_back(i);
reported[i] = true;
}
}
}
}

void QueryContextManager::collect_query_statistics(const PCollectQueryStatisticsRequest* request,
PCollectQueryStatisticsResult* response) {
for (int i = 0; i < request->query_ids_size(); i++) {
const PUniqueId& p_query_id = request->query_ids(i);
TUniqueId id;
id.__set_hi(p_query_id.hi());
id.__set_lo(p_query_id.lo());
if (auto query_ctx = get(id); query_ctx != nullptr) {
int64_t cpu_cost = query_ctx->cpu_cost();
int64_t scan_rows = query_ctx->cur_scan_rows_num();
int64_t scan_bytes = query_ctx->get_scan_bytes();
auto query_statistics = response->add_query_statistics();
auto query_id = query_statistics->mutable_query_id();
query_id->set_hi(p_query_id.hi());
query_id->set_lo(p_query_id.lo());
query_statistics->set_cpu_cost_ns(cpu_cost);
query_statistics->set_scan_rows(scan_rows);
query_statistics->set_scan_bytes(scan_bytes);
}
}
}

void QueryContextManager::report_fragments(
const std::vector<PipeLineReportTaskKey>& pipeline_need_report_query_fragment_ids) {
std::vector<std::shared_ptr<FragmentContext>> need_report_fragment_context;
std::vector<std::shared_ptr<QueryContext>> need_report_query_ctx;

std::vector<PipeLineReportTaskKey> fragment_context_non_exist;

for (const auto& key : pipeline_need_report_query_fragment_ids) {
TUniqueId query_id = key.query_id;
TUniqueId fragment_instance_id = key.fragment_instance_id;
auto query_ctx = get(query_id);
if (!query_ctx) {
fragment_context_non_exist.push_back(key);
continue;
}
need_report_query_ctx.push_back(query_ctx);
auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id);
if (!fragment_ctx) {
fragment_context_non_exist.push_back(key);
continue;
}
need_report_fragment_context.push_back(fragment_ctx);
}

std::vector<bool> reported(need_report_fragment_context.size(), false);
for (int i = 0; i < need_report_fragment_context.size(); i++) {
if (reported[i] == false) {
reported[i] = true;

FragmentContext* fragment_ctx = need_report_fragment_context[i].get();

if (fragment_ctx->all_pipelines_finished()) {
continue;
}

Status fragment_ctx_status = fragment_ctx->final_status();
if (!fragment_ctx_status.ok()) {
starrocks::ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(
fragment_ctx->query_id(), fragment_ctx->fragment_instance_id());
continue;
}

Status fe_connection_status;
auto fe_addr = fragment_ctx->fe_addr();
auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();
auto* runtime_state = fragment_ctx->runtime_state();
DCHECK(runtime_state != nullptr);

FrontendServiceConnection fe_connection(exec_env->frontend_client_cache(), fe_addr, &fe_connection_status);
if (!fe_connection_status.ok()) {
std::stringstream ss;
ss << "couldn't get a client for " << fe_addr;
LOG(WARNING) << ss.str();
starrocks::ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(
fragment_ctx->query_id(), fragment_ctx->fragment_instance_id());
exec_env->frontend_client_cache()->close_connections(fe_addr);
continue;
}

std::vector<TReportExecStatusParams> report_exec_status_params_vector;

TReportExecStatusParams params;

params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(fragment_ctx->query_id());
params.__set_backend_num(runtime_state->be_number());
params.__set_fragment_instance_id(fragment_id);
fragment_ctx_status.set_t_status(&params);
params.__set_done(false);

if (runtime_state->query_options().query_type == TQueryType::LOAD) {
runtime_state->update_report_load_status(&params);
params.__set_load_type(runtime_state->query_options().load_job_type);
}

auto backend_id = get_backend_id();
if (backend_id.has_value()) {
params.__set_backend_id(backend_id.value());
}

report_exec_status_params_vector.push_back(params);

std::vector<int32_t> cur_batch_report_indexes;
cur_batch_report_indexes.push_back(i);

report_fragments_with_same_host(need_report_fragment_context, reported, fe_addr,
report_exec_status_params_vector, cur_batch_report_indexes);

TBatchReportExecStatusParams report_batch;
report_batch.__set_params_list(report_exec_status_params_vector);

TBatchReportExecStatusResult res;
Status rpc_status;

VLOG_ROW << "debug: reportExecStatus params is " << apache::thrift::ThriftDebugString(params).c_str();

try {
try {
fe_connection->batchReportExecStatus(res, report_batch);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus: " << e.what();
rpc_status = fe_connection.reopen();
if (!rpc_status.ok()) {
continue;
}
fe_connection->batchReportExecStatus(res, report_batch);
}

} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
}

const std::vector<TStatus>& status_list = res.status_list;
for (int j = 0; j < status_list.size(); j++) {
Status rpc_status = Status(status_list[j]);
if (!rpc_status.ok()) {
int32_t index = cur_batch_report_indexes[j];
FragmentContext* fragment_ctx = need_report_fragment_context[index].get();
fragment_ctx->cancel(rpc_status);
}
}
}
}

for (const auto& key : fragment_context_non_exist) {
starrocks::ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(key.query_id,
key.fragment_instance_id);
}
}

>>>>>>> 15c6e64da ([Enhancement] Avoid call QueryCtx destructor when hold mutex (#16602))
} // namespace starrocks::pipeline
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class QueryContextManager {
void _stop_clean_func() { _stop.store(true); }
bool _is_stopped() { return _stop; }
size_t _slot_idx(const TUniqueId& query_id);
void _clean_slot_unlocked(size_t i);
void _clean_slot_unlocked(size_t i, std::vector<QueryContextPtr>& del);

private:
const size_t _num_slots;
Expand Down
Loading

0 comments on commit 76a354d

Please sign in to comment.