Skip to content

Commit

Permalink
[fix](memory) Fix pipelinex submit task attach memory tracker (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Apr 17, 2024
1 parent 1e324b5 commit 6bbd9df
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 11 deletions.
4 changes: 2 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down Expand Up @@ -249,7 +249,7 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
if (status.ok()) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
_cal_delete_bitmap_req(cal_delete_bitmap_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"CloudEngineCalcDeleteBitmapTask");
}

Expand Down Expand Up @@ -134,7 +134,7 @@ CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
_transaction_id(transaction_id),
_version(version) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
_table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask");
}

Expand Down Expand Up @@ -357,7 +357,7 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine& engine,
_transaction_id(transaction_id),
_version(version),
_tablet_info(tablet_info),
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask")) {
_stats.submit_time_us = MonotonicMicros();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class AsyncTabletPublishTask {
_partition_id(partition_id),
_transaction_id(transaction_id),
_version(version),
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"AsyncTabletPublishTask")) {
_stats.submit_time_us = MonotonicMicros();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fold_constant_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals,
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("FoldConstant:query_id={}", print_id(_query_id)));
_runtime_state =
RuntimeState::create_unique(fragment_params.params, query_options, query_globals,
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) {
Status FragmentMgr::trigger_pipeline_context_report(
const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
return _async_report_thread_pool->submit_func([this, req, ctx]() {
SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker);
coordinator_callback(req);
if (!req.done) {
ctx->refresh_next_report_time();
Expand Down Expand Up @@ -986,7 +987,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,

for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() {
prepare_status[i] = pre_and_submit(i);
{
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
prepare_status[i] = pre_and_submit(i);
}
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
const auto& local_tablet_uid = local_tablet->tablet_uid();

std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE, fmt::format("IngestBinlog#TxnId={}", txn_id));
MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id));
SCOPED_ATTACH_TASK(mem_tracker);

auto& request = arg->request;
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
const TFileScanRangeParams& params = file_scan_range.params;

std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
MemTrackerLimiter::Type::OTHER,
fmt::format("{}#{}", params.format_type, params.file_type));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);

Expand Down

0 comments on commit 6bbd9df

Please sign in to comment.