Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Jun 4, 2024
1 parent 9929b19 commit 9309241
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 0 deletions.
33 changes: 33 additions & 0 deletions dbms/src/Common/MemoryAllocTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/MemoryAllocTrace.h>
#include <common/config_common.h> // Included for `USE_JEMALLOC`
#include <Common/setThreadName.h>

#if USE_JEMALLOC
#include <jemalloc/jemalloc.h>
Expand All @@ -35,4 +36,36 @@ std::tuple<uint64_t *, uint64_t *> getAllocDeallocPtr()
return std::make_tuple(nullptr, nullptr);
#endif
}

ThreadStackAllocTracker::ThreadStackAllocTracker() {
std::tie(alloc_ptr, dealloc_ptr) = getAllocDeallocPtr();
}

void ThreadStackAllocTracker::begin() {
begin_alloc = 0;
begin_dealloc = 0;
if likely(alloc_ptr && dealloc_ptr) {
begin_alloc = *alloc_ptr;
begin_dealloc = *dealloc_ptr;
}
}
std::pair<uint64_t, uint64_t> ThreadStackAllocTracker::end() {
if likely(alloc_ptr && dealloc_ptr) {
uint64_t delta_alloc = *alloc_ptr - begin_alloc;
uint64_t delta_dealloc = *dealloc_ptr - begin_dealloc;
total_alloc += delta_alloc;
total_dealloc += delta_dealloc;
return std::make_pair(delta_alloc, delta_dealloc);
}
return std::make_pair(0, 0);
}

std::string ThreadStackAllocTracker::threadName() const {
return getThreadName();
}

void ThreadStackAllocTracker::report() {

}

} // namespace DB
20 changes: 20 additions & 0 deletions dbms/src/Common/MemoryAllocTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,28 @@
#pragma once

#include <tuple>
#include <common/types.h>
#include <common/defines.h>

namespace DB
{
std::tuple<uint64_t *, uint64_t *> getAllocDeallocPtr();

struct ThreadStackAllocTracker {
ThreadStackAllocTracker();

void begin();
std::pair<uint64_t, uint64_t> end();
void report();

std::string threadName() const;
private:
uint64_t * alloc_ptr = nullptr;
uint64_t * dealloc_ptr = nullptr;
UInt64 total_alloc = 0;
UInt64 total_dealloc = 0;
UInt64 begin_alloc = 0;
UInt64 begin_dealloc = 0;
};

} // namespace DB
35 changes: 35 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,39 @@ void TiFlashMetrics::registerStorageThreadMemory(const std::string & k)
}
}

static std::string genStackPrefix(TiFlashMetrics::MemoryAllocType type, const std::string & k)
{
if (type == TiFlashMetrics::MemoryAllocType::Alloc)
{
return "salloc_" + k;
}
else
{
return "sdealloc_" + k;
}
}

void TiFlashMetrics::registerStackThreadMemory(const std::string & k)
{
std::unique_lock lock(stack_thread_report_mtx);
{
auto prefix = genStackPrefix(TiFlashMetrics::MemoryAllocType::Alloc, k);
if unlikely (!registered_stack_thread_memory_usage_metrics.contains(prefix))
{
registered_stack_thread_memory_usage_metrics.emplace(
prefix,
&registered_stack_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
{
auto prefix = genStackPrefix(TiFlashMetrics::MemoryAllocType::Dealloc, k);
if unlikely (!registered_stack_thread_memory_usage_metrics.contains(prefix))
{
registered_stack_thread_memory_usage_metrics.emplace(
prefix,
&registered_stack_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
}

} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ class TiFlashMetrics
static constexpr auto async_metrics_prefix = "tiflash_system_asynchronous_metric_";
static constexpr auto raft_proxy_thread_memory_usage = "tiflash_raft_proxy_thread_memory_usage";
static constexpr auto storages_thread_memory_usage = "tiflash_storages_thread_memory_usage";
static constexpr auto stack_thread_memory_usage = "tiflash_stack_thread_memory_usage";

std::shared_ptr<prometheus::Registry> registry = std::make_shared<prometheus::Registry>();
// Here we add a ProcessCollector to collect cpu/rss/vsize/start_time information.
Expand Down Expand Up @@ -1169,6 +1170,10 @@ class TiFlashMetrics
std::shared_mutex storage_thread_report_mtx;
std::unordered_map<std::string, prometheus::Gauge *> registered_storage_thread_memory_usage_metrics;

prometheus::Family<prometheus::Gauge> * registered_stack_thread_memory_usage_family;
std::shared_mutex stack_thread_report_mtx;
std::unordered_map<std::string, prometheus::Gauge *> registered_stack_thread_memory_usage_metrics;

public:
#define MAKE_METRIC_MEMBER_M(family_name, help, type, ...) \
MetricFamily<prometheus::type> family_name \
Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ struct ThreadInfoJealloc
}
};

struct StackThreadInfoJealloc {
explicit StackThreadInfoJealloc(char aggregate_delimer_)
: aggregate_delimer(aggregate_delimer_)
{}
char aggregate_delimer = '-';
uint64_t allocated{0};
uint64_t deallocated{0};
int64_t remaining() const
{
if (allocated > deallocated)
{
return static_cast<int64_t>(allocated - deallocated);
}
else
{
return -static_cast<int64_t>(deallocated - allocated);
}
}
};

/// Works in two different ways:
/// NOTE in both ways, call reportThreadAllocInfo to register by `Reset` for every thread to be monitored.
/// And call reportThreadAllocInfo to deregister by `Remove` for every thread that is guaranteed to no longer be monitored.
Expand All @@ -79,6 +99,7 @@ class JointThreadInfoJeallocMap
{
public:
using AllocMap = std::unordered_map<std::string, ThreadInfoJealloc>;
using StackAllocMap = std::unordered_map<std::string, StackThreadInfoJealloc>;
JointThreadInfoJeallocMap();
~JointThreadInfoJeallocMap();
void recordThreadAllocInfo();
Expand Down Expand Up @@ -106,11 +127,18 @@ class JointThreadInfoJeallocMap
uint64_t value,
char aggregate_delimer);

public: // StackThread
void reportStackThreadAllocInfo(
const std::string &,
StackThreadInfoJealloc,
char aggregate_delimer);

private:
/// Be called periodicly to submit the alloc info to TiFlashMetrics
/// Note that this function rely on `TiFlashMetrics::instance` is alive
void recordThreadAllocInfoForProxy();
void recordThreadAllocInfoForStorage();
void recordStackThreadAllocInfo();

/// Note that this function rely on `TiFlashMetrics::instance` is alive
void reportThreadAllocInfoImpl(
Expand All @@ -122,8 +150,10 @@ class JointThreadInfoJeallocMap

private:
mutable std::shared_mutex memory_allocation_mut;
mutable std::shared_mutex stack_memory_allocation_mut;
AllocMap proxy_map;
AllocMap storage_map;
StackAllocMap stack_map;

bool is_terminated{false};
mutable std::mutex monitoring_mut;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ extern const char pause_passive_flush_before_persist_region[];
extern const char force_not_clean_fap_on_destroy[];
} // namespace FailPoints

thread_local ThreadStackAllocTracker mem_tracker_kvs_memtable_persistence;

KVStore::KVStore(Context & context)
: region_persister(
context.getSharedContextDisagg()->isDisaggregatedComputeMode() ? nullptr
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <Storages/KVStore/MultiRaft/RegionManager.h>
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>
#include <Storages/KVStore/StorageEngineType.h>
#include <Common/MemoryAllocTrace.h>

#include <condition_variable>
#include <magic_enum.hpp>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ void KVStore::persistRegion(
"try access to region_persister without initialization, stack={}",
StackTrace().toString());

mem_tracker_kvs_memtable_persistence.begin();
SCOPE_EXIT({ mem_tracker_kvs_memtable_persistence.end(); });

auto reason_id = magic_enum::enum_underlying(reason);
std::string caller = fmt::format("{} {}", PersistRegionReasonMap[reason_id], extra_msg);
LOG_INFO(
Expand Down

0 comments on commit 9309241

Please sign in to comment.