Skip to content

Commit

Permalink
[Enhancement] add more metrics to help locate hotspot issues (StarRoc…
Browse files Browse the repository at this point in the history
…ks#53490)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
  • Loading branch information
silverbullet233 authored Feb 7, 2025
1 parent 5f4c5cb commit 000cdd2
Show file tree
Hide file tree
Showing 44 changed files with 557 additions and 100 deletions.
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void calculate_metrics(void* arg_this) {
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->jit_cache_mem_bytes.value());

StarRocksMetrics::instance()->table_metrics_mgr()->cleanup();
nap_sleep(15, [daemon] { return daemon->stopped(); });
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ set(EXEC_FILES
pipeline/sort/spillable_partition_sort_sink_operator.cpp
pipeline/sort/local_parallel_merge_sort_source_operator.cpp
pipeline/sort/sort_context.cpp
pipeline/pipeline_metrics.cpp
pipeline/pipeline_driver_executor.cpp
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,7 @@ void PipelineDriver::_try_to_release_buffer(RuntimeState* state, OperatorPtr& op
}
}

void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state, int64_t schedule_count,
int64_t execution_time) {
void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
stop_timers();
int64_t time_spent = 0;
// The driver may be destructed after finalizing, so use a temporal driver to record
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class PipelineDriver {
void set_morsel_queue(MorselQueue* morsel_queue) { _morsel_queue = morsel_queue; }
Status prepare(RuntimeState* runtime_state);
virtual StatusOr<DriverState> process(RuntimeState* runtime_state, int worker_id);
void finalize(RuntimeState* runtime_state, DriverState state, int64_t schedule_count, int64_t execution_time);
void finalize(RuntimeState* runtime_state, DriverState state);
DriverAcct& driver_acct() { return _driver_acct; }
DriverState driver_state() const { return _state; }

Expand Down
32 changes: 17 additions & 15 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include "agent/master_info.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"
Expand All @@ -30,14 +31,19 @@
namespace starrocks::pipeline {

GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_ptr<ThreadPool> thread_pool,
bool enable_resource_group, const CpuUtil::CpuIds& cpuids)
bool enable_resource_group, const CpuUtil::CpuIds& cpuids,
PipelineExecutorMetrics* metrics)
: Base("pip_exec_" + name),
_driver_queue(enable_resource_group ? std::unique_ptr<DriverQueue>(std::make_unique<WorkGroupDriverQueue>())
: std::make_unique<QuerySharedDriverQueue>()),
_driver_queue(enable_resource_group
? std::unique_ptr<DriverQueue>(
std::make_unique<WorkGroupDriverQueue>(metrics->get_driver_queue_metrics()))
: std::make_unique<QuerySharedDriverQueue>(metrics->get_driver_queue_metrics())),
_thread_pool(std::move(thread_pool)),
_blocked_driver_poller(new PipelineDriverPoller(name, _driver_queue.get(), cpuids)),
_blocked_driver_poller(
new PipelineDriverPoller(name, _driver_queue.get(), cpuids, metrics->get_poller_metrics())),
_exec_state_reporter(new ExecStateReporter(cpuids)),
_audit_statistics_reporter(new AuditStatisticsReporter()) {}
_audit_statistics_reporter(new AuditStatisticsReporter()),
_metrics(metrics->get_driver_executor_metrics()) {}

void GlobalDriverExecutor::close() {
_driver_queue->close();
Expand All @@ -53,13 +59,6 @@ void GlobalDriverExecutor::initialize(int num_threads) {
}
}

DriverExecutorMetrics GlobalDriverExecutor::metrics() const {
return {.schedule_count = _schedule_count.load(),
.driver_execution_ns = _driver_execution_ns.load(),
.driver_queue_len = static_cast<int64_t>(_driver_queue->size()),
.driver_poller_block_queue_len = static_cast<int64_t>(_blocked_driver_poller->num_drivers())};
}

void GlobalDriverExecutor::change_num_threads(int32_t num_threads) {
int32_t old_num_threads = 0;
if (!_num_threads_setter.adjust_expect_num(num_threads, &old_num_threads)) {
Expand All @@ -74,7 +73,7 @@ void GlobalDriverExecutor::change_num_threads(int32_t num_threads) {

void GlobalDriverExecutor::_finalize_driver(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state) {
DCHECK(driver);
driver->finalize(runtime_state, state, _schedule_count, _driver_execution_ns);
driver->finalize(runtime_state, state);
}

void GlobalDriverExecutor::_worker_thread() {
Expand Down Expand Up @@ -112,7 +111,7 @@ void GlobalDriverExecutor::_worker_thread() {
auto* fragment_ctx = driver->fragment_ctx();

driver->increment_schedule_times();
_schedule_count++;
_metrics->driver_schedule_count.increment(1);

SCOPED_SET_TRACE_INFO(driver->driver_id(), query_ctx->query_id(), fragment_ctx->fragment_instance_id());

Expand Down Expand Up @@ -152,6 +151,7 @@ void GlobalDriverExecutor::_worker_thread() {

StatusOr<DriverState> maybe_state;
int64_t start_time = driver->get_active_time();
_metrics->exec_running_tasks.increment(1);
#ifdef NDEBUG
TRY_CATCH_ALL(maybe_state, driver->process(runtime_state, worker_id));
#else
Expand All @@ -163,7 +163,9 @@ void GlobalDriverExecutor::_worker_thread() {
Status status = maybe_state.status();
this->_driver_queue->update_statistics(driver);
int64_t end_time = driver->get_active_time();
_driver_execution_ns += end_time - start_time;
_metrics->driver_execution_time.increment(end_time - start_time);
_metrics->exec_running_tasks.increment(-1);
_metrics->exec_finished_tasks.increment(1);

// Check big query
if (!driver->is_query_never_expired() && status.ok() && driver->workgroup()) {
Expand Down
15 changes: 4 additions & 11 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "exec/pipeline/pipeline_driver_poller.h"
#include "exec/pipeline/pipeline_driver_queue.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/query_context.h"
#include "runtime/runtime_state.h"
#include "util/factory_method.h"
Expand All @@ -34,12 +35,7 @@ namespace starrocks::pipeline {
class DriverExecutor;
using DriverExecutorPtr = std::shared_ptr<DriverExecutor>;

struct DriverExecutorMetrics {
int64_t schedule_count;
int64_t driver_execution_ns;
int64_t driver_queue_len;
int64_t driver_poller_block_queue_len;
};
class PipelineExecutorMetrics;

class DriverExecutor {
public:
Expand Down Expand Up @@ -72,16 +68,14 @@ class DriverExecutor {

virtual void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector<CpuUtil::CpuIds>& borrowed_cpuids) = 0;

virtual DriverExecutorMetrics metrics() const = 0;

protected:
std::string _name;
};

class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDriverExecutor> {
public:
GlobalDriverExecutor(const std::string& name, std::unique_ptr<ThreadPool> thread_pool, bool enable_resource_group,
const CpuUtil::CpuIds& cpuids);
const CpuUtil::CpuIds& cpuids, PipelineExecutorMetrics* metrics);
~GlobalDriverExecutor() override = default;
void initialize(int32_t num_threads) override;
void change_num_threads(int32_t num_threads) override;
Expand Down Expand Up @@ -111,8 +105,6 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr

void _finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state);

DriverExecutorMetrics metrics() const override;

private:
// The maximum duration that a driver could stay in local_driver_queue
static constexpr int64_t LOCAL_MAX_WAIT_TIME_SPENT_NS = 1'000'000L;
Expand All @@ -132,6 +124,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
// metrics
std::unique_ptr<UIntGauge> _driver_queue_len;
std::unique_ptr<UIntGauge> _driver_poller_block_queue_len;
DriverExecutorMetrics* _metrics;
};

} // namespace starrocks::pipeline
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>

#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "runtime/exec_env.h"
#include "util/time_guard.h"

Expand Down Expand Up @@ -195,7 +196,7 @@ void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {

std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
_num_drivers++;
_metrics->poller_block_queue_len.increment(1);
driver->_pending_timer_sw->reset();
driver->driver_acct().clean_local_queue_infos();
_cond.notify_one();
Expand Down Expand Up @@ -247,7 +248,7 @@ void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drive
auto& driver = *driver_it;
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
_num_drivers--;
_metrics->poller_block_queue_len.increment(-1);
}

void PipelineDriverPoller::on_cancel(DriverRawPtr driver, std::vector<DriverRawPtr>& ready_drivers,
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/pipeline/pipeline_driver_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ namespace starrocks::pipeline {

class PipelineDriverPoller;
using PipelineDriverPollerPtr = std::unique_ptr<PipelineDriverPoller>;
class PollerMetrics;

class PipelineDriverPoller {
public:
explicit PipelineDriverPoller(std::string name, DriverQueue* driver_queue, CpuUtil::CpuIds cpuids)
explicit PipelineDriverPoller(std::string name, DriverQueue* driver_queue, CpuUtil::CpuIds cpuids,
PollerMetrics* metrics)
: _name(std::move(name)),
_cpud_ids(std::move(cpuids)),
_driver_queue(driver_queue),
_polling_thread(nullptr),
_is_polling_thread_initialized(false),
_is_shutdown(false),
_num_drivers(0) {}
_metrics(metrics) {}

~PipelineDriverPoller() { shutdown(); }

Expand All @@ -56,9 +58,6 @@ class PipelineDriverPoller {
size_t activate_parked_driver(const ConstDriverPredicator& predicate_func);
size_t calculate_parked_driver(const ConstDriverPredicator& predicate_func) const;

// only used for collect metrics
size_t num_drivers() const { return _num_drivers; }

void for_each_driver(const ConstDriverConsumer& call) const;

void bind_cpus(const CpuUtil::CpuIds& cpuids);
Expand Down Expand Up @@ -88,6 +87,6 @@ class PipelineDriverPoller {
mutable std::mutex _global_parked_mutex;
DriverList _parked_drivers;

std::atomic<size_t> _num_drivers;
PollerMetrics* _metrics;
};
} // namespace starrocks::pipeline
12 changes: 11 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@

#include "exec/pipeline/pipeline_driver_queue.h"

#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/source_operator.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"

namespace starrocks::pipeline {

/// QuerySharedDriverQueue.
QuerySharedDriverQueue::QuerySharedDriverQueue() {
QuerySharedDriverQueue::QuerySharedDriverQueue(DriverQueueMetrics* metrics) : FactoryMethod(metrics) {
double factor = 1;
for (int i = QUEUE_SIZE - 1; i >= 0; --i) {
// initialize factor for every sub queue,
// Higher priority queues have more execution time,
// so they have a larger factor.
_queues[i].set_metrics(metrics);
_queues[i].factor_for_normal = factor;
factor *= RATIO_OF_ADJACENT_QUEUE;
}
Expand Down Expand Up @@ -57,6 +59,7 @@ void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) {
_cv.notify_one();
++_num_drivers;
}
_metrics->driver_queue_len.increment(1);
}

void QuerySharedDriverQueue::put_back(const std::vector<DriverRawPtr>& drivers) {
Expand All @@ -74,6 +77,7 @@ void QuerySharedDriverQueue::put_back(const std::vector<DriverRawPtr>& drivers)
_cv.notify_one();
}
_num_drivers += drivers.size();
_metrics->driver_queue_len.increment(drivers.size());
}

void QuerySharedDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
Expand Down Expand Up @@ -121,6 +125,7 @@ StatusOr<DriverRawPtr> QuerySharedDriverQueue::take(const bool block) {
driver_ptr->set_in_ready(false);

--_num_drivers;
_metrics->driver_queue_len.increment(-1);
}
}

Expand Down Expand Up @@ -171,6 +176,7 @@ void SubQuerySharedDriverQueue::put(const DriverRawPtr driver) {
queue.emplace_back(driver);
}
num_drivers++;
_metrics->driver_queue_len.increment(1);
}

void SubQuerySharedDriverQueue::cancel(const DriverRawPtr driver) {
Expand All @@ -188,6 +194,7 @@ DriverRawPtr SubQuerySharedDriverQueue::take(const bool block) {
pending_cancel_queue.pop();
cancelled_set.insert(driver);
--num_drivers;
_metrics->driver_queue_len.increment(-1);
return driver;
}

Expand All @@ -199,6 +206,7 @@ DriverRawPtr SubQuerySharedDriverQueue::take(const bool block) {
cancelled_set.erase(iter);
} else {
--num_drivers;
_metrics->driver_queue_len.increment(-1);
return driver;
}
}
Expand Down Expand Up @@ -283,6 +291,7 @@ StatusOr<DriverRawPtr> WorkGroupDriverQueue::take(const bool block) {
auto maybe_driver = wg_entity->queue()->take(block);
if (maybe_driver.ok() && maybe_driver.value() != nullptr) {
--_num_drivers;
_metrics->driver_queue_len.increment(-1);
}
return maybe_driver;
}
Expand Down Expand Up @@ -352,6 +361,7 @@ void WorkGroupDriverQueue::_put_back(const DriverRawPtr driver) {
}

++_num_drivers;
_metrics->driver_queue_len.increment(1);

_cv.notify_one();
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ namespace starrocks::pipeline {

class DriverQueue;
using DriverQueuePtr = std::unique_ptr<DriverQueue>;
class DriverQueueMetrics;

class DriverQueue {
public:
DriverQueue(DriverQueueMetrics* metrics) : _metrics(metrics) {}
virtual ~DriverQueue() = default;
virtual void close() = 0;

Expand All @@ -46,6 +48,8 @@ class DriverQueue {
bool empty() const { return size() == 0; }

virtual bool should_yield(const DriverRawPtr driver, int64_t unaccounted_runtime_ns) const = 0;

DriverQueueMetrics* _metrics;
};

// SubQuerySharedDriverQueue is used to store the driver waiting to be executed.
Expand Down Expand Up @@ -82,6 +86,7 @@ class SubQuerySharedDriverQueue {
inline bool empty() const { return num_drivers == 0; }

inline size_t size() const { return num_drivers; }
void set_metrics(DriverQueueMetrics* metrics) { _metrics = metrics; }

std::deque<DriverRawPtr> queue;
std::queue<DriverRawPtr> pending_cancel_queue;
Expand All @@ -93,13 +98,14 @@ class SubQuerySharedDriverQueue {

private:
std::atomic<int64_t> _accu_consume_time = 0;
DriverQueueMetrics* _metrics;
};

class QuerySharedDriverQueue : public FactoryMethod<DriverQueue, QuerySharedDriverQueue> {
friend class FactoryMethod<DriverQueue, QuerySharedDriverQueue>;

public:
QuerySharedDriverQueue();
QuerySharedDriverQueue(DriverQueueMetrics* metrics);
~QuerySharedDriverQueue() override = default;
void close() override;
void put_back(const DriverRawPtr driver) override;
Expand Down Expand Up @@ -149,6 +155,7 @@ class WorkGroupDriverQueue : public FactoryMethod<DriverQueue, WorkGroupDriverQu
friend class FactoryMethod<DriverQueue, WorkGroupDriverQueue>;

public:
WorkGroupDriverQueue(DriverQueueMetrics* metrics) : FactoryMethod(metrics) {}
~WorkGroupDriverQueue() override = default;
void close() override;

Expand Down
Loading

0 comments on commit 000cdd2

Please sign in to comment.