Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Mar 13, 2024
1 parent 3795455 commit ea1c824
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ DEFINE_mBool(disable_memory_gc, "false");

DEFINE_mInt64(large_memory_check_bytes, "2147483648");

DEFINE_mBool(disable_memory_orphan_check, "false");
DEFINE_mBool(enable_memory_orphan_check, "true");

// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ DECLARE_mBool(disable_memory_gc);
// If is -1, disable large memory check.
DECLARE_mInt64(large_memory_check_bytes);

// default is false. if any memory tracking in Orphan mem tracker will report error.
DECLARE_mBool(disable_memory_orphan_check);
// default is true. if any memory tracking in Orphan mem tracker will report error.
DECLARE_mBool(enable_memory_orphan_check);

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
DECLARE_mInt32(thread_wait_gc_max_milliseconds);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
auto st = _query_ctx->exec_status();
_tasks.clear();
_query_ctx.reset();
if (!_task_runtime_states.empty()) {
for (auto& runtime_state : _task_runtime_states) {
_call_back(runtime_state.get(), &st);
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,

RETURN_IF_ERROR(
_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
{
// Need lock here, because it will modify fragment ids and std::vector may resize and reallocate
// memory, but query_is_canncelled will traverse the vector, it will core.
Expand Down Expand Up @@ -818,6 +819,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,

std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);

const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine &&
params.query_options.enable_pipeline_x_engine;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
}

void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
_query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
if (is_attach_query()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
_query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
}
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MemTracker;

QueryThreadContext ThreadContext::query_thread_context() {
DCHECK(doris::pthread_context_ptr_init);
CHECK(config::disable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan");
CHECK(!config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan");
return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker()};
}

Expand Down
18 changes: 11 additions & 7 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class ThreadContext {
QueryThreadContext query_thread_context();

void consume_memory(const int64_t size) const {
CHECK(doris::config::disable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan");
CHECK(!doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan");
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}

Expand Down Expand Up @@ -289,7 +290,8 @@ class QueryThreadContext {
void init() {
DCHECK(doris::pthread_context_ptr_init);
ThreadContext* thread_context = doris::thread_context();
CHECK(doris::config::disable_memory_orphan_check || thread_context->thread_mem_tracker()->label() != "Orphan");
CHECK(!doris::config::enable_memory_orphan_check ||
thread_context->thread_mem_tracker()->label() != "Orphan");
query_id = thread_context->task_id();
query_mem_tracker = thread_context->thread_mem_tracker_mgr->limiter_mem_tracker();
}
Expand Down Expand Up @@ -385,25 +387,27 @@ class AddThreadMemTrackerConsumerByHook {
#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
do { \
if (is_thread_context_init()) { \
CHECK(doris::config::disable_memory_orphan_check || doris::thread_context()->thread_mem_tracker()->label() != "Orphan"); \
CHECK(!doris::config::enable_memory_orphan_check || \
doris::thread_context()->thread_mem_tracker()->label() != "Orphan"); \
doris::thread_context() \
->thread_mem_tracker_mgr->limiter_mem_tracker_raw() \
->transfer_to(size, tracker); \
} else { \
CHECK(doris::config::disable_memory_orphan_check); \
CHECK(!doris::config::enable_memory_orphan_check); \
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->transfer_to(size, tracker); \
} \
} while (0)

#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
do { \
if (is_thread_context_init()) { \
CHECK(doris::config::disable_memory_orphan_check || doris::thread_context()->thread_mem_tracker()->label() != "Orphan"); \
CHECK(!doris::config::enable_memory_orphan_check || \
doris::thread_context()->thread_mem_tracker()->label() != "Orphan"); \
tracker->transfer_to( \
size, \
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \
} else { \
CHECK(doris::config::disable_memory_orphan_check); \
CHECK(!doris::config::enable_memory_orphan_check); \
tracker->transfer_to(size, doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()); \
} \
} while (0)
Expand Down Expand Up @@ -449,7 +453,7 @@ class AddThreadMemTrackerConsumerByHook {
static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \
->consume_memory(size); \
} else if (doris::ExecEnv::ready()) { \
CHECK(doris::config::disable_memory_orphan_check); \
CHECK(!doris::config::enable_memory_orphan_check); \
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \
} \
} while (0)
Expand Down

0 comments on commit ea1c824

Please sign in to comment.