diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 435e30f6855..ff7c2a1c970 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -309,6 +309,8 @@ namespace DB F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \ F(type_mpp_query_count, {"type", "mpp_query_count"})) \ + M(tiflash_mpp_task_monitor, "Monitor the lifecycle of MPP Task", Gauge, \ + F(type_longest_live_time, {"type", "longest_live_time"}),) \ M(tiflash_exchange_queueing_data_bytes, "Total bytes of data contained in the queue", Gauge, \ F(type_send, {{"type", "send_queue"}}), \ F(type_receive, {{"type", "recv_queue"}})) \ diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 09d7a25c0b6..0013ae2780d 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -85,6 +85,20 @@ void injectFailPointDuringRegisterTunnel(bool is_root_task) } } // namespace +void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const String & task_unique_id_) +{ + manager = manager_; + task_unique_id = task_unique_id_; + manager->addMonitoredTask(task_unique_id); + initialized = true; +} + +MPPTaskMonitorHelper::~MPPTaskMonitorHelper() +{ + if (initialized) + manager->removeMonitoredTask(task_unique_id); +} + MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) : meta(meta_) , id(meta) @@ -95,6 +109,7 @@ MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) , mpp_task_statistics(id, meta.address()) { current_memory_tracker = nullptr; + mpp_task_monitor_helper.initAndAddself(manager, id.toString()); } MPPTask::~MPPTask() diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 009d3703707..c1994930b42 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -47,6 +47,22 @@ enum class AbortType ONERROR, }; +// This struct notify the MPPTaskManager that this MPPTask is completed destructed +class MPPTaskMonitorHelper +{ +public: + MPPTaskMonitorHelper() = default; + + ~MPPTaskMonitorHelper(); + + void initAndAddself(MPPTaskManager * manager_, const String & task_unique_id_); + +private: + MPPTaskManager * manager = nullptr; + String task_unique_id; + bool initialized = false; +}; + class MPPTask : public std::enable_shared_from_this , private boost::noncopyable { @@ -113,6 +129,9 @@ class MPPTask : public std::enable_shared_from_this void setErrString(const String & message); private: + // We must ensure this member variable is put at this place to be destructed at proper time + MPPTaskMonitorHelper mpp_task_monitor_helper; + // To make sure dag_req is not destroyed before the mpp task ends. tipb::DAGRequest dag_req; mpp::TaskMeta meta; diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index eb81425c50c..8f455076446 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -43,6 +43,7 @@ struct MPPQueryId bool operator==(const MPPQueryId & rid) const; bool operator!=(const MPPQueryId & rid) const; bool operator<=(const MPPQueryId & rid) const; + String toString() const { return fmt::format("", query_ts, local_query_id, server_id, start_ts); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index b3ad98b5d15..6d28a2c23b3 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -20,8 +20,11 @@ #include #include +#include +#include #include #include +#include namespace DB { @@ -34,8 +37,16 @@ extern const char pause_before_register_non_root_mpp_task[]; MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , log(Logger::get()) + , monitor(std::make_shared(log)) {} +MPPTaskManager::~MPPTaskManager() +{ + std::lock_guard lock(monitor->mu); + monitor->is_shutdown = true; + monitor->cv.notify_all(); +} + MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(const MPPQueryId & query_id) { auto ptr = std::make_shared(); @@ -308,5 +319,4 @@ void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads) std::lock_guard lock(mu); scheduler->releaseThreadsThenSchedule(needed_threads, *this); } - } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index b986d8ae866..fd383ed5bd6 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -23,7 +23,9 @@ #include #include +#include #include +#include namespace DB { @@ -59,6 +61,49 @@ using MPPQueryTaskSetPtr = std::shared_ptr; /// the uniqueness of the start ts when stale read or set snapshot using MPPQueryMap = std::unordered_map; +struct MPPTaskMonitor +{ +public: + explicit MPPTaskMonitor(const LoggerPtr & log_) + : log(log_) + {} + + void addMonitoredTask(const String & task_unique_id) + { + std::lock_guard lock(mu); + auto iter = monitored_tasks.find(task_unique_id); + if (iter != monitored_tasks.end()) + { + LOG_WARNING(log, "task {} is repeatedly added to be monitored which is not an expected behavior!"); + return; + } + + monitored_tasks.insert(std::make_pair(task_unique_id, Stopwatch())); + } + + void removeMonitoredTask(const String & task_unique_id) + { + std::lock_guard lock(mu); + auto iter = monitored_tasks.find(task_unique_id); + if (iter == monitored_tasks.end()) + { + LOG_WARNING(log, "Unexpected behavior! task {} is not found in monitored_task."); + return; + } + + monitored_tasks.erase(iter); + } + + std::mutex mu; + std::condition_variable cv; + bool is_shutdown = false; + const LoggerPtr log; + + // All created MPPTasks should be put into this variable. + // Only when the MPPTask is completed destructed, the task can be removed from it. + std::unordered_map monitored_tasks; +}; + // MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context. class MPPTaskManager : private boost::noncopyable { @@ -72,10 +117,18 @@ class MPPTaskManager : private boost::noncopyable std::condition_variable cv; + std::shared_ptr monitor; + public: explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler); - ~MPPTaskManager() = default; + ~MPPTaskManager(); + + std::shared_ptr getMPPTaskMonitor() const { return monitor; } + + void addMonitoredTask(const String & task_unique_id) { monitor->addMonitoredTask(task_unique_id); } + + void removeMonitoredTask(const String & task_unique_id) { monitor->removeMonitoredTask(task_unique_id); } MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(const MPPQueryId & query_id); diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index b1de0210ea0..877e8635e45 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -47,7 +47,9 @@ const int64_t DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC = 20 * 60; const int64_t DEFAULT_READ_INDEX_WORKER_TICK_MS = 10; -static SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test, const KVClusterPtr & cluster, bool disaggregated_compute_mode) +namespace +{ +SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test, const KVClusterPtr & cluster, bool disaggregated_compute_mode) { // Doesn't need SchemaSyncer for tiflash_compute mode. if (disaggregated_compute_mode) @@ -72,6 +74,54 @@ static SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test std::make_shared>(cluster)); } +// Print log for MPPTask which hasn't been removed for over 25 minutes. +void checkLongLiveMPPTasks(const std::unordered_map & monitored_tasks, const LoggerPtr & log) +{ + String log_info; + double longest_live_time = 0; + for (const auto & iter : monitored_tasks) + { + auto alive_time = iter.second.elapsedSeconds(); + if (alive_time > longest_live_time) + longest_live_time = alive_time; + if (alive_time >= 1500) + log_info = fmt::format("{} ", log_info, alive_time, iter.first); + } + + if (!log_info.empty()) + LOG_INFO(log, log_info); + GET_METRIC(tiflash_mpp_task_monitor, type_longest_live_time).Set(longest_live_time); +} + +void monitorMPPTasks(std::shared_ptr monitor) +{ + std::unique_lock lock(monitor->mu, std::defer_lock); + while (true) + { + lock.lock(); + + // Check MPPTasks every 25 minutes + monitor->cv.wait_for(lock, std::chrono::seconds(1500)); + + auto snapshot = monitor->monitored_tasks; + if (monitor->is_shutdown) + { + lock.unlock(); + checkLongLiveMPPTasks(snapshot, monitor->log); + return; + } + + lock.unlock(); + checkLongLiveMPPTasks(snapshot, monitor->log); + } +} + +void startMonitorMPPTaskThread(const MPPTaskManagerPtr & manager) +{ + newThreadManager()->scheduleThenDetach(false, "MPPTask-Moniter", [monitor = manager->getMPPTaskMonitor()] { monitorMPPTasks(monitor); }); +} +} // namespace + TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config) : context(context_) , kvstore(context_.getSharedContextDisagg()->isDisaggregatedComputeMode() && context_.getSharedContextDisagg()->use_autoscaler ? nullptr : std::make_shared(context)) @@ -93,6 +143,8 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config , read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS) , wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC) { + startMonitorMPPTaskThread(mpp_task_manager); + if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled() && !context.getSharedContextDisagg()->isDisaggregatedComputeMode()) { etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config); diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 5079d212055..20b2362bf20 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -3935,6 +3935,107 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 52 + }, + "hiddenSeries": false, + "id": 199, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "tiflash_mpp_task_monitor{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Time of the Longest Live MPP Task", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:78", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:79", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -5220,7 +5321,7 @@ "h": 8, "w": 12, "x": 0, - "y": 38 + "y": 6 }, "hiddenSeries": false, "id": 41, @@ -5333,7 +5434,7 @@ "h": 8, "w": 12, "x": 12, - "y": 38 + "y": 6 }, "hiddenSeries": false, "id": 38, @@ -5491,7 +5592,7 @@ "h": 8, "w": 24, "x": 0, - "y": 46 + "y": 14 }, "hiddenSeries": false, "id": 40, @@ -5591,7 +5692,7 @@ "h": 5, "w": 12, "x": 0, - "y": 54 + "y": 22 }, "hiddenSeries": false, "id": 39, @@ -5694,7 +5795,7 @@ "h": 5, "w": 12, "x": 12, - "y": 54 + "y": 22 }, "hiddenSeries": false, "id": 42, @@ -5798,7 +5899,7 @@ "h": 5, "w": 12, "x": 0, - "y": 59 + "y": 27 }, "hiddenSeries": false, "id": 130, @@ -5901,7 +6002,7 @@ "h": 5, "w": 12, "x": 12, - "y": 59 + "y": 27 }, "hiddenSeries": false, "id": 131, @@ -6005,7 +6106,7 @@ "h": 7, "w": 8, "x": 0, - "y": 64 + "y": 32 }, "hiddenSeries": false, "id": 50, @@ -6139,7 +6240,7 @@ "h": 7, "w": 8, "x": 8, - "y": 64 + "y": 32 }, "hiddenSeries": false, "id": 22, @@ -6253,7 +6354,7 @@ "h": 7, "w": 8, "x": 16, - "y": 64 + "y": 32 }, "hiddenSeries": false, "id": 52, @@ -6355,7 +6456,7 @@ "h": 7, "w": 12, "x": 0, - "y": 71 + "y": 39 }, "hiddenSeries": false, "id": 46, @@ -6478,7 +6579,7 @@ "h": 7, "w": 12, "x": 12, - "y": 71 + "y": 39 }, "hiddenSeries": false, "id": 47, @@ -6602,7 +6703,7 @@ "h": 8, "w": 12, "x": 0, - "y": 78 + "y": 46 }, "height": "", "hiddenSeries": false, @@ -6732,7 +6833,7 @@ "h": 8, "w": 12, "x": 12, - "y": 78 + "y": 46 }, "height": "", "hiddenSeries": false, @@ -6860,7 +6961,7 @@ "h": 8, "w": 12, "x": 0, - "y": 86 + "y": 54 }, "hiddenSeries": false, "id": 84, @@ -6960,7 +7061,7 @@ "h": 8, "w": 12, "x": 12, - "y": 86 + "y": 54 }, "hiddenSeries": false, "id": 86, @@ -7092,7 +7193,7 @@ "h": 8, "w": 12, "x": 0, - "y": 94 + "y": 62 }, "hiddenSeries": false, "id": 132, @@ -7225,7 +7326,7 @@ "h": 8, "w": 12, "x": 12, - "y": 94 + "y": 62 }, "hiddenSeries": false, "id": 67, @@ -7339,7 +7440,7 @@ "h": 7, "w": 12, "x": 0, - "y": 102 + "y": 70 }, "hiddenSeries": false, "id": 169, @@ -7488,7 +7589,7 @@ "h": 7, "w": 12, "x": 12, - "y": 102 + "y": 70 }, "hiddenSeries": false, "id": 88, @@ -7687,7 +7788,7 @@ "h": 8, "w": 12, "x": 12, - "y": 109 + "y": 77 }, "hiddenSeries": false, "id": 168, @@ -8432,7 +8533,7 @@ "h": 8, "w": 12, "x": 0, - "y": 40 + "y": 8 }, "hiddenSeries": false, "id": 128, @@ -8575,7 +8676,7 @@ "h": 8, "w": 12, "x": 12, - "y": 40 + "y": 8 }, "hiddenSeries": false, "id": 129, @@ -8692,7 +8793,7 @@ "h": 8, "w": 12, "x": 0, - "y": 48 + "y": 16 }, "heatmap": {}, "hideZeroBuckets": true, @@ -8754,7 +8855,7 @@ "h": 8, "w": 12, "x": 12, - "y": 48 + "y": 16 }, "hiddenSeries": false, "id": 158, @@ -8890,7 +8991,7 @@ "h": 8, "w": 12, "x": 0, - "y": 56 + "y": 24 }, "hiddenSeries": false, "id": 163, @@ -8995,7 +9096,7 @@ "h": 8, "w": 12, "x": 12, - "y": 56 + "y": 24 }, "hiddenSeries": false, "id": 162, @@ -9115,7 +9216,7 @@ "h": 8, "w": 12, "x": 0, - "y": 64 + "y": 32 }, "hiddenSeries": false, "id": 164, @@ -9224,7 +9325,7 @@ "h": 8, "w": 12, "x": 12, - "y": 64 + "y": 32 }, "hiddenSeries": false, "id": 123, @@ -9346,7 +9447,7 @@ "h": 8, "w": 12, "x": 0, - "y": 72 + "y": 40 }, "height": "", "hiddenSeries": false, @@ -9472,7 +9573,7 @@ "y": 9 }, "hiddenSeries": false, - "id": 167, + "id": 166, "legend": { "alignAsTable": false, "avg": false,