Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](memory) Fix LRU cache deleter and memory tracking #32080

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@ CloudTabletMgr::~CloudTabletMgr() = default;
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id,
bool warmup_data) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
struct Value {
class Value : public LRUCacheValueBase {
public:
Value(const std::shared_ptr<CloudTablet>& tablet, TabletMap& tablet_map)
: LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TABLET_CACHE),
tablet(tablet),
tablet_map(tablet_map) {}
~Value() override { tablet_map.erase(tablet.get()); }

// FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value`
// only requires a reference.
std::shared_ptr<CloudTablet> tablet;
Expand All @@ -154,10 +161,9 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i

auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str);
auto* cache = _cache->cache();
auto* handle = cache->lookup(key);
auto* handle = _cache->lookup(key);
if (handle == nullptr) {
auto load_tablet = [this, cache, &key,
auto load_tablet = [this, &key,
warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> {
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
Expand All @@ -167,28 +173,18 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
}

auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
auto value = std::make_unique<Value>(Value {
.tablet = tablet,
.tablet_map = *_tablet_map,
});
auto value = std::make_unique<Value>(tablet, *_tablet_map);
// MUST sync stats to let compaction scheduler work correctly
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st;
return nullptr;
}

auto deleter = [](const CacheKey& key, void* value) {
auto* value1 = reinterpret_cast<Value*>(value);
// tablet has been evicted, release it from `tablet_map`
value1->tablet_map.erase(value1->tablet.get());
delete value1;
};

auto* handle = cache->insert(key, value.release(), 1, deleter, CachePriority::NORMAL,
sizeof(CloudTablet));
auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
CachePriority::NORMAL);
auto ret = std::shared_ptr<CloudTablet>(
tablet.get(), [cache, handle](...) { cache->release(handle); });
tablet.get(), [this, handle](...) { _cache->release(handle); });
_tablet_map->put(std::move(tablet));
return ret;
};
Expand All @@ -200,16 +196,16 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
return tablet;
}

CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get();
CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr,
[cache, handle](...) { cache->release(handle); });
[this, handle](...) { _cache->release(handle); });
return tablet;
}

void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str.data(), tablet_id_str.size());
_cache->cache()->erase(key);
_cache->erase(key);
}

void CloudTabletMgr::vacuum_stale_rowsets() {
Expand Down
23 changes: 8 additions & 15 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,16 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
Cache::Handle* handle = cache()->lookup(key);
Cache::Handle* handle = lookup(key);

DeleteBitmapCacheValue* val =
handle == nullptr ? nullptr
: reinterpret_cast<DeleteBitmapCacheValue*>(cache()->value(handle));
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
*delete_bitmap = val->delete_bitmap;
*rowset_ids = val->rowset_ids;
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
cache()->release(handle);
release(handle);
} else {
LOG_INFO("cache missed when get delete bitmap")
.tag("txn_id", transaction_id)
Expand Down Expand Up @@ -111,17 +110,14 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
CacheKey key(key_str);

auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
auto deleter = [](const CacheKey&, void* value) {
delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
};
size_t charge = sizeof(DeleteBitmapCacheValue);
for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
charge += v.getSizeInBytes();
}
auto handle = cache()->insert(key, val, charge, deleter, CachePriority::NORMAL);
auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
cache()->release(handle);
release(handle);
LOG_INFO("set txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("expiration", txn_expiration)
Expand All @@ -137,17 +133,14 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
CacheKey key(key_str);

auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
auto deleter = [](const CacheKey&, void* value) {
delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
};
size_t charge = sizeof(DeleteBitmapCacheValue);
for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
charge += v.getSizeInBytes();
}
auto handle = cache()->insert(key, val, charge, deleter, CachePriority::NORMAL);
auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
cache()->release(handle);
release(handle);
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
Expand All @@ -174,7 +167,7 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
cache()->erase(cache_key);
erase(cache_key);
_txn_map.erase(iter->second);
}
_expiration_txn.erase(iter);
Expand Down
7 changes: 5 additions & 2 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
private:
void _clean_thread_callback();

struct DeleteBitmapCacheValue {
class DeleteBitmapCacheValue : public LRUCacheValueBase {
public:
DeleteBitmapPtr delete_bitmap;
// records rowsets calc in commit txn
RowsetIdUnorderedSet rowset_ids;

DeleteBitmapCacheValue(DeleteBitmapPtr delete_bitmap_, const RowsetIdUnorderedSet& ids_)
: delete_bitmap(std::move(delete_bitmap_)), rowset_ids(ids_) {}
: LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE),
delete_bitmap(std::move(delete_bitmap_)),
rowset_ids(ids_) {}
};

struct TxnKey {
Expand Down
48 changes: 7 additions & 41 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <string>

#include "gutil/bits.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/time.h"

Expand Down Expand Up @@ -350,34 +349,23 @@ bool LRUCache::_check_element_count_limit() {
}

Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
MemTrackerLimiter* tracker, CachePriority priority, size_t bytes) {
CachePriority priority) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
auto* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
// if LRUCacheType::NUMBER, charge not add handle_size,
// because charge at this time is no longer the memory size, but an weight.
e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : charge);
DCHECK(_type == LRUCacheType::SIZE || bytes != -1) << " _type " << _type;
// if LRUCacheType::NUMBER and bytes equals 0, such as some caches cannot accurately track memory size.
// cache mem tracker value divided by handle_size(106) will get the number of cache entries.
e->bytes = (_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes);
e->hash = hash;
e->refs = 2; // one for the returned handle, one for LRUCache.
e->next = e->prev = nullptr;
e->in_cache = true;
e->priority = priority;
e->mem_tracker = tracker;
e->type = _type;
memcpy(e->key_data, key.data(), key.size());
e->last_visit_time = UnixMillis();
// The memory of the parameter value should be recorded in the tls mem tracker,
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
THREAD_MEM_TRACKER_TRANSFER_TO(e->bytes, tracker);
DorisMetrics::instance()->lru_cache_memory_bytes->increment(e->bytes);
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard l(_mutex);
Expand Down Expand Up @@ -464,7 +452,7 @@ PrunedInfo LRUCache::prune() {
int64_t pruned_size = 0;
while (to_remove_head != nullptr) {
++pruned_count;
pruned_size += to_remove_head->bytes;
pruned_size += to_remove_head->total_size;
LRUHandle* next = to_remove_head->next;
to_remove_head->free();
to_remove_head = next;
Expand Down Expand Up @@ -506,7 +494,7 @@ PrunedInfo LRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) {
int64_t pruned_size = 0;
while (to_remove_head != nullptr) {
++pruned_count;
pruned_size += to_remove_head->bytes;
pruned_size += to_remove_head->total_size;
LRUHandle* next = to_remove_head->next;
to_remove_head->free();
to_remove_head = next;
Expand Down Expand Up @@ -534,9 +522,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
_shards(nullptr),
_last_id(1),
_total_capacity(total_capacity) {
_mem_tracker = std::make_unique<MemTrackerLimiter>(
MemTrackerLimiter::Type::GLOBAL,
fmt::format("{}[{}]", name, lru_cache_type_string(type)));
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
<< "num_shards should be power of two, but got " << num_shards;
Expand Down Expand Up @@ -594,11 +579,9 @@ ShardedLRUCache::~ShardedLRUCache() {
}

Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority, size_t bytes) {
CachePriority priority) {
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, _mem_tracker.get(),
priority, bytes);
return _shards[_shard(hash)]->insert(key, hash, value, charge, priority);
}

Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
Expand All @@ -620,11 +603,6 @@ void* ShardedLRUCache::value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}

Slice ShardedLRUCache::value_slice(Handle* handle) {
auto lru_handle = reinterpret_cast<LRUHandle*>(handle);
return Slice((char*)lru_handle->value, lru_handle->charge);
}

uint64_t ShardedLRUCache::new_id() {
return _last_id.fetch_add(1, std::memory_order_relaxed);
}
Expand All @@ -649,10 +627,6 @@ PrunedInfo ShardedLRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) {
return pruned_info;
}

int64_t ShardedLRUCache::mem_consumption() {
return _mem_tracker->consumption();
}

int64_t ShardedLRUCache::get_usage() {
size_t total_usage = 0;
for (int i = 0; i < _num_shards; i++) {
Expand Down Expand Up @@ -683,16 +657,13 @@ void ShardedLRUCache::update_cache_metrics() const {
}

Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority, size_t bytes) {
CachePriority priority) {
size_t handle_size = sizeof(LRUHandle);
auto* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = 0;
e->total_size = 0;
e->bytes = 0;
e->hash = 0;
e->refs = 1; // only one for the returned handle
e->next = e->prev = nullptr;
Expand All @@ -712,9 +683,4 @@ void* DummyLRUCache::value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}

Slice DummyLRUCache::value_slice(Handle* handle) {
auto* lru_handle = reinterpret_cast<LRUHandle*>(handle);
return Slice((char*)lru_handle->value, lru_handle->charge);
}

} // namespace doris
Loading
Loading