diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index abe918d5badd17..85cff4ef2cc684 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -68,6 +68,8 @@ using std::vector; namespace doris { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(agent_task_queue_size, MetricUnit::NOUNIT); + const uint32_t TASK_FINISH_MAX_RETRY = 3; const uint32_t PUBLISH_VERSION_MAX_RETRY = 3; const uint32_t REPORT_TASK_WORKER_COUNT = 1; @@ -81,8 +83,7 @@ FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info) - : _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING(task_worker_type))), - _master_info(master_info), + : _master_info(master_info), _agent_utils(new AgentUtils()), _master_client(new MasterServerClient(_master_info, &_master_service_client_cache)), _env(env), @@ -93,11 +94,24 @@ TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* e _backend.__set_host(BackendOptions::get_localhost()); _backend.__set_be_port(config::be_port); _backend.__set_http_port(config::webserver_port); + + string task_worker_type_name = TYPE_STRING(task_worker_type); + _name = strings::Substitute("TaskWorkerPool.$0", task_worker_type_name); + + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + task_worker_type_name, {{"type", task_worker_type_name}}); + REGISTER_ENTITY_HOOK_METRIC(_metric_entity, this, agent_task_queue_size, [this]() { + lock_guard lock(_worker_thread_lock); + return _tasks.size(); + }); } TaskWorkerPool::~TaskWorkerPool() { _stop_background_threads_latch.count_down(); stop(); + + DEREGISTER_ENTITY_HOOK_METRIC(_metric_entity, agent_task_queue_size); + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); } void TaskWorkerPool::start() { diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index cfbd6dfe0bcd61..04fbe781441804 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -221,6 +221,9 @@ class TaskWorkerPool { std::unique_ptr _thread_pool; std::deque _tasks; + std::shared_ptr _metric_entity; + UIntGauge* agent_task_queue_size; + uint32_t _worker_count; TaskWorkerType _task_worker_type; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 5b76db1040ccf3..cab2873e8fbd25 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -28,16 +28,19 @@ namespace doris { +#define REGISTER_ENTITY_HOOK_METRIC(entity, owner, metric, func) \ + owner->metric = (UIntGauge*)(entity->register_metric(&METRIC_##metric)); \ + entity->register_hook(#metric, [&]() { owner->metric->set_value(func()); }); + #define REGISTER_HOOK_METRIC(metric, func) \ - DorisMetrics::instance()->metric = \ - (UIntGauge*)(DorisMetrics::instance()->server_entity()->register_metric( \ - &METRIC_##metric)); \ - DorisMetrics::instance()->server_entity()->register_hook( \ - #metric, [&]() { DorisMetrics::instance()->metric->set_value(func()); }); - -#define DEREGISTER_HOOK_METRIC(name) \ - DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \ - DorisMetrics::instance()->server_entity()->deregister_hook(#name); + REGISTER_ENTITY_HOOK_METRIC(DorisMetrics::instance()->server_entity(), DorisMetrics::instance(), metric, func) + +#define DEREGISTER_ENTITY_HOOK_METRIC(entity, name) \ + entity->deregister_metric(&METRIC_##name); \ + entity->deregister_hook(#name); + +#define DEREGISTER_HOOK_METRIC(name) \ + DEREGISTER_ENTITY_HOOK_METRIC(DorisMetrics::instance()->server_entity(), name) class DorisMetrics { public: