Skip to content

Commit

Permalink
Revert "[Enhancement] Avoid call QueryCtx destructor when hold mutex (S…
Browse files Browse the repository at this point in the history
…tarRocks#16602)"

This reverts commit 15c6e64.
  • Loading branch information
kangkaisen committed Mar 7, 2023
1 parent 147416d commit b0ec644
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 78 deletions.
65 changes: 13 additions & 52 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
#include "runtime/stream_load/transaction_mgr.h"
#include "util/debug/query_trace.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"

Expand Down Expand Up @@ -603,66 +602,28 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
UnifiedExecPlanFragmentParams request(common_request, unique_request);

bool prepare_success = false;
struct {
int64_t prepare_time = 0;
int64_t prepare_query_ctx_time = 0;
int64_t prepare_fragment_ctx_time = 0;
int64_t prepare_runtime_state_time = 0;
int64_t prepare_pipeline_driver_time = 0;
} profiler;

DeferOp defer([this, &request, &prepare_success, &profiler]() {
int64_t prepare_time = 0;
DeferOp defer([this, &request, &prepare_success, &prepare_time]() {
if (prepare_success) {
auto fragment_ctx = _query_ctx->fragment_mgr()->get(request.fragment_instance_id());
auto* prepare_timer =
ADD_TIMER(fragment_ctx->runtime_state()->runtime_profile(), "FragmentInstancePrepareTime");
COUNTER_SET(prepare_timer, profiler.prepare_time);
auto* prepare_query_ctx_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-query-ctx",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_query_ctx_timer, profiler.prepare_query_ctx_time);

auto* prepare_fragment_ctx_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-fragment-ctx",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_fragment_ctx_timer, profiler.prepare_fragment_ctx_time);

auto* prepare_runtime_state_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(), "prepare-runtime-state",
"FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_runtime_state_timer, profiler.prepare_runtime_state_time);

auto* prepare_pipeline_driver_timer =
ADD_CHILD_TIMER_THESHOLD(fragment_ctx->runtime_state()->runtime_profile(),
"prepare-pipeline-driver", "FragmentInstancePrepareTime", 10_ms);
COUNTER_SET(prepare_pipeline_driver_timer, profiler.prepare_runtime_state_time);
COUNTER_SET(prepare_timer, prepare_time);
} else {
_fail_cleanup();
}
});

SCOPED_RAW_TIMER(&profiler.prepare_time);
SCOPED_RAW_TIMER(&prepare_time);
RETURN_IF_ERROR(exec_env->query_pool_mem_tracker()->check_mem_limit("Start execute plan fragment."));
{
SCOPED_RAW_TIMER(&profiler.prepare_query_ctx_time);
RETURN_IF_ERROR(_prepare_query_ctx(exec_env, request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_fragment_ctx_time);
RETURN_IF_ERROR(_prepare_fragment_ctx(request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_runtime_state_time);
RETURN_IF_ERROR(_prepare_workgroup(request));
RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
RETURN_IF_ERROR(_prepare_global_dict(request));
}
{
SCOPED_RAW_TIMER(&profiler.prepare_pipeline_driver_time);
RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));
}

RETURN_IF_ERROR(_prepare_query_ctx(exec_env, request));
RETURN_IF_ERROR(_prepare_fragment_ctx(request));
RETURN_IF_ERROR(_prepare_workgroup(request));
RETURN_IF_ERROR(_prepare_runtime_state(exec_env, request));
RETURN_IF_ERROR(_prepare_exec_plan(exec_env, request));
RETURN_IF_ERROR(_prepare_global_dict(request));
RETURN_IF_ERROR(_prepare_pipeline_driver(exec_env, request));
RETURN_IF_ERROR(_prepare_stream_load_pipe(exec_env, request));

RETURN_IF_ERROR(_query_ctx->fragment_mgr()->register_ctx(request.fragment_instance_id(), _fragment_ctx));
prepare_success = true;
Expand Down
18 changes: 4 additions & 14 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "exec/pipeline/query_context.h"

#include <memory>
#include <vector>

#include "agent/master_info.h"
#include "exec/pipeline/fragment_context.h"
Expand Down Expand Up @@ -207,13 +206,11 @@ Status QueryContextManager::init() {
return Status::InternalError("Fail to create clean_thread of QueryContextManager");
}
}
void QueryContextManager::_clean_slot_unlocked(size_t i, std::vector<QueryContextPtr>& del) {
void QueryContextManager::_clean_slot_unlocked(size_t i) {
auto& sc_map = _second_chance_maps[i];
auto sc_it = sc_map.begin();
while (sc_it != sc_map.end()) {
if (sc_it->second->has_no_active_instances() && sc_it->second->is_delivery_expired()) {
del.emplace_back(std::move(sc_it->second));
LOG(WARNING) << " clean query " << print_id(sc_it->first) << "from the second map";
sc_it = sc_map.erase(sc_it);
} else {
++sc_it;
Expand All @@ -223,9 +220,8 @@ void QueryContextManager::_clean_slot_unlocked(size_t i, std::vector<QueryContex
void QueryContextManager::_clean_query_contexts() {
for (auto i = 0; i < _num_slots; ++i) {
auto& mutex = _mutexes[i];
std::vector<QueryContextPtr> del_list;
std::unique_lock write_lock(mutex);
_clean_slot_unlocked(i, del_list);
_clean_slot_unlocked(i);
}
}

Expand Down Expand Up @@ -338,13 +334,8 @@ bool QueryContextManager::remove(const TUniqueId& query_id) {
auto& context_map = _context_maps[i];
auto& sc_map = _second_chance_maps[i];

// retain the query_ctx reference to avoid call destructors while holding a lock
// we should define them before hold the write lock
QueryContextPtr query_ctx;
std::vector<QueryContextPtr> del_list;

std::unique_lock<std::shared_mutex> write_lock(mutex);
_clean_slot_unlocked(i, del_list);
_clean_slot_unlocked(i);
// return directly if query_ctx is absent
auto it = context_map.find(query_id);
if (it == context_map.end()) {
Expand All @@ -353,7 +344,6 @@ bool QueryContextManager::remove(const TUniqueId& query_id) {

// the query context is really dead, so just cleanup
if (it->second->is_dead()) {
query_ctx = std::move(it->second);
context_map.erase(it);
LOG(WARNING) << " remove query " << print_id(query_id) << " from the context_map";
return true;
Expand Down Expand Up @@ -428,7 +418,7 @@ void QueryContextManager::report_fragments_with_same_host(
params.__set_backend_id(backend_id.value());
}

report_exec_status_params_vector.emplace_back(std::move(params));
report_exec_status_params_vector.push_back(params);
cur_batch_report_indexes.push_back(i);
reported[i] = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class QueryContextManager {
void _stop_clean_func() { _stop.store(true); }
bool _is_stopped() { return _stop; }
size_t _slot_idx(const TUniqueId& query_id);
void _clean_slot_unlocked(size_t i, std::vector<QueryContextPtr>& del);
void _clean_slot_unlocked(size_t i);

private:
const size_t _num_slots;
Expand Down
10 changes: 3 additions & 7 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1018,13 +1018,9 @@ void RuntimeProfile::print_child_counters(const std::string& prefix, const std::
for (const std::string& child_counter : child_counters) {
auto iter = counter_map.find(child_counter);
DCHECK(iter != counter_map.end());
auto value = iter->second.first->value();
auto display_threshold = iter->second.first->display_threshold();
if (display_threshold > 0 && value > display_threshold) {
stream << prefix << " - " << iter->first << ": "
<< PrettyPrinter::print(iter->second.first->value(), iter->second.first->type()) << std::endl;
RuntimeProfile::print_child_counters(prefix + " ", child_counter, counter_map, child_counter_map, s);
}
stream << prefix << " - " << iter->first << ": "
<< PrettyPrinter::print(iter->second.first->value(), iter->second.first->type()) << std::endl;
RuntimeProfile::print_child_counters(prefix + " ", child_counter, counter_map, child_counter_map, s);
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@

namespace starrocks {

inline unsigned long long operator"" _ms(unsigned long long x) {
return x * 1000 * 1000;
}

// Define macros for updating counters. The macros make it very easy to disable
// all counters at compile time. Set this to 0 to remove counters. This is useful
// to do to make sure the counters aren't affecting the system.
Expand Down

0 comments on commit b0ec644

Please sign in to comment.