Skip to content

Commit

Permalink
Print MPPTask info when it's alive for a long time (#7489)
Browse files Browse the repository at this point in the history
close #7351, close #7488
  • Loading branch information
xzhangxian1008 authored Jun 2, 2023
1 parent 03f3012 commit fbfb4cb
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 34 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ namespace DB
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
M(tiflash_mpp_task_monitor, "Monitor the lifecycle of MPP Task", Gauge, \
F(type_longest_live_time, {"type", "longest_live_time"}),) \
M(tiflash_exchange_queueing_data_bytes, "Total bytes of data contained in the queue", Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ void injectFailPointDuringRegisterTunnel(bool is_root_task)
}
} // namespace

void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const String & task_unique_id_)
{
manager = manager_;
task_unique_id = task_unique_id_;
manager->addMonitoredTask(task_unique_id);
initialized = true;
}

MPPTaskMonitorHelper::~MPPTaskMonitorHelper()
{
if (initialized)
manager->removeMonitoredTask(task_unique_id);
}

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta)
Expand All @@ -95,6 +109,7 @@ MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
, mpp_task_statistics(id, meta.address())
{
current_memory_tracker = nullptr;
mpp_task_monitor_helper.initAndAddself(manager, id.toString());
}

MPPTask::~MPPTask()
Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ enum class AbortType
ONERROR,
};

// This struct notify the MPPTaskManager that this MPPTask is completed destructed
class MPPTaskMonitorHelper
{
public:
MPPTaskMonitorHelper() = default;

~MPPTaskMonitorHelper();

void initAndAddself(MPPTaskManager * manager_, const String & task_unique_id_);

private:
MPPTaskManager * manager = nullptr;
String task_unique_id;
bool initialized = false;
};

class MPPTask : public std::enable_shared_from_this<MPPTask>
, private boost::noncopyable
{
Expand Down Expand Up @@ -113,6 +129,9 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
void setErrString(const String & message);

private:
// We must ensure this member variable is put at this place to be destructed at proper time
MPPTaskMonitorHelper mpp_task_monitor_helper;

// To make sure dag_req is not destroyed before the mpp task ends.
tipb::DAGRequest dag_req;
mpp::TaskMeta meta;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct MPPQueryId
bool operator==(const MPPQueryId & rid) const;
bool operator!=(const MPPQueryId & rid) const;
bool operator<=(const MPPQueryId & rid) const;

String toString() const
{
return fmt::format("<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}>", query_ts, local_query_id, server_id, start_ts);
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
#include <fmt/core.h>

#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>

namespace DB
{
Expand All @@ -34,8 +37,16 @@ extern const char pause_before_register_non_root_mpp_task[];
MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, log(Logger::get())
, monitor(std::make_shared<MPPTaskMonitor>(log))
{}

MPPTaskManager::~MPPTaskManager()
{
std::lock_guard lock(monitor->mu);
monitor->is_shutdown = true;
monitor->cv.notify_all();
}

MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(const MPPQueryId & query_id)
{
auto ptr = std::make_shared<MPPQueryTaskSet>();
Expand Down Expand Up @@ -308,5 +319,4 @@ void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads)
std::lock_guard lock(mu);
scheduler->releaseThreadsThenSchedule(needed_threads, *this);
}

} // namespace DB
55 changes: 54 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <unordered_map>

namespace DB
{
Expand Down Expand Up @@ -59,6 +61,49 @@ using MPPQueryTaskSetPtr = std::shared_ptr<MPPQueryTaskSet>;
/// the uniqueness of the start ts when stale read or set snapshot
using MPPQueryMap = std::unordered_map<MPPQueryId, MPPQueryTaskSetPtr, MPPQueryIdHash>;

struct MPPTaskMonitor
{
public:
explicit MPPTaskMonitor(const LoggerPtr & log_)
: log(log_)
{}

void addMonitoredTask(const String & task_unique_id)
{
std::lock_guard lock(mu);
auto iter = monitored_tasks.find(task_unique_id);
if (iter != monitored_tasks.end())
{
LOG_WARNING(log, "task {} is repeatedly added to be monitored which is not an expected behavior!");
return;
}

monitored_tasks.insert(std::make_pair(task_unique_id, Stopwatch()));
}

void removeMonitoredTask(const String & task_unique_id)
{
std::lock_guard lock(mu);
auto iter = monitored_tasks.find(task_unique_id);
if (iter == monitored_tasks.end())
{
LOG_WARNING(log, "Unexpected behavior! task {} is not found in monitored_task.");
return;
}

monitored_tasks.erase(iter);
}

std::mutex mu;
std::condition_variable cv;
bool is_shutdown = false;
const LoggerPtr log;

// All created MPPTasks should be put into this variable.
// Only when the MPPTask is completed destructed, the task can be removed from it.
std::unordered_map<String, Stopwatch> monitored_tasks;
};

// MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context.
class MPPTaskManager : private boost::noncopyable
{
Expand All @@ -72,10 +117,18 @@ class MPPTaskManager : private boost::noncopyable

std::condition_variable cv;

std::shared_ptr<MPPTaskMonitor> monitor;

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

~MPPTaskManager() = default;
~MPPTaskManager();

std::shared_ptr<MPPTaskMonitor> getMPPTaskMonitor() const { return monitor; }

void addMonitoredTask(const String & task_unique_id) { monitor->addMonitoredTask(task_unique_id); }

void removeMonitoredTask(const String & task_unique_id) { monitor->removeMonitoredTask(task_unique_id); }

MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(const MPPQueryId & query_id);

Expand Down
54 changes: 53 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ const int64_t DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC = 20 * 60;

const int64_t DEFAULT_READ_INDEX_WORKER_TICK_MS = 10;

static SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test, const KVClusterPtr & cluster, bool disaggregated_compute_mode)
namespace
{
SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test, const KVClusterPtr & cluster, bool disaggregated_compute_mode)
{
// Doesn't need SchemaSyncer for tiflash_compute mode.
if (disaggregated_compute_mode)
Expand All @@ -72,6 +74,54 @@ static SchemaSyncerPtr createSchemaSyncer(bool exist_pd_addr, bool for_unit_test
std::make_shared<TiDBSchemaSyncer</*mock_getter*/ true, /*mock_mapper*/ false>>(cluster));
}

// Print log for MPPTask which hasn't been removed for over 25 minutes.
void checkLongLiveMPPTasks(const std::unordered_map<String, Stopwatch> & monitored_tasks, const LoggerPtr & log)
{
String log_info;
double longest_live_time = 0;
for (const auto & iter : monitored_tasks)
{
auto alive_time = iter.second.elapsedSeconds();
if (alive_time > longest_live_time)
longest_live_time = alive_time;
if (alive_time >= 1500)
log_info = fmt::format("{} <MPPTask is alive for {} secs, {}>", log_info, alive_time, iter.first);
}

if (!log_info.empty())
LOG_INFO(log, log_info);
GET_METRIC(tiflash_mpp_task_monitor, type_longest_live_time).Set(longest_live_time);
}

void monitorMPPTasks(std::shared_ptr<MPPTaskMonitor> monitor)
{
std::unique_lock lock(monitor->mu, std::defer_lock);
while (true)
{
lock.lock();

// Check MPPTasks every 25 minutes
monitor->cv.wait_for(lock, std::chrono::seconds(1500));

auto snapshot = monitor->monitored_tasks;
if (monitor->is_shutdown)
{
lock.unlock();
checkLongLiveMPPTasks(snapshot, monitor->log);
return;
}

lock.unlock();
checkLongLiveMPPTasks(snapshot, monitor->log);
}
}

void startMonitorMPPTaskThread(const MPPTaskManagerPtr & manager)
{
newThreadManager()->scheduleThenDetach(false, "MPPTask-Moniter", [monitor = manager->getMPPTaskMonitor()] { monitorMPPTasks(monitor); });
}
} // namespace

TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config)
: context(context_)
, kvstore(context_.getSharedContextDisagg()->isDisaggregatedComputeMode() && context_.getSharedContextDisagg()->use_autoscaler ? nullptr : std::make_shared<KVStore>(context))
Expand All @@ -93,6 +143,8 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
, read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS)
, wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC)
{
startMonitorMPPTaskThread(mpp_task_manager);

if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled() && !context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config);
Expand Down
Loading

0 comments on commit fbfb4cb

Please sign in to comment.