From b1cae7cc0083723adcb162d60f07e4d29a854a8c Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 13 Mar 2024 10:57:09 +0800 Subject: [PATCH] [fix](memory) Fix LRU cache deleter and memory tracking (#32080) In order to add common code to the value deleter of LRU cache, let all lru cache values inherit from LRUCacheValueBase class and tracking memory in destructor. --- be/src/cloud/cloud_tablet_mgr.cpp | 38 ++++---- .../cloud/cloud_txn_delete_bitmap_cache.cpp | 23 ++--- be/src/cloud/cloud_txn_delete_bitmap_cache.h | 7 +- be/src/olap/lru_cache.cpp | 48 ++-------- be/src/olap/lru_cache.h | 52 +++-------- be/src/olap/page_cache.cpp | 18 ++-- be/src/olap/page_cache.h | 39 ++++---- .../segment_v2/bitshuffle_page_pre_decoder.h | 7 +- be/src/olap/rowset/segment_v2/encoding_info.h | 4 +- .../segment_v2/inverted_index_cache.cpp | 36 +++----- .../rowset/segment_v2/inverted_index_cache.h | 26 +++--- be/src/olap/rowset/segment_v2/page_io.cpp | 17 ++-- be/src/olap/schema_cache.h | 23 +++-- be/src/olap/segment_loader.cpp | 20 ++--- be/src/olap/segment_loader.h | 8 +- be/src/olap/storage_engine.cpp | 16 ++-- be/src/olap/storage_engine.h | 5 +- be/src/olap/tablet_meta.cpp | 5 +- be/src/olap/tablet_meta.h | 7 +- be/src/olap/tablet_schema_cache.cpp | 21 +++-- be/src/olap/tablet_schema_cache.h | 6 +- be/src/olap/txn_manager.cpp | 21 ++--- be/src/olap/txn_manager.h | 7 ++ be/src/runtime/load_channel_mgr.cpp | 11 +-- be/src/runtime/memory/cache_manager.cpp | 9 +- be/src/runtime/memory/cache_manager.h | 29 ++++-- be/src/runtime/memory/cache_policy.cpp | 4 +- be/src/runtime/memory/cache_policy.h | 10 ++- be/src/runtime/memory/lru_cache_policy.h | 80 ++++++++++++++--- be/src/runtime/memory/lru_cache_value_base.h | 51 +++++++++++ be/src/service/point_query_executor.cpp | 19 ++-- be/src/service/point_query_executor.h | 49 +++++----- be/src/util/doris_metrics.cpp | 4 - be/src/util/doris_metrics.h | 2 - be/src/util/obj_lru_cache.cpp | 6 +- be/src/util/obj_lru_cache.h | 42 +++++---- .../exec/format/parquet/vparquet_reader.cpp | 2 +- be/test/olap/lru_cache_test.cpp | 90 +++++++++++-------- be/test/olap/page_cache_test.cpp | 43 +++++---- 39 files changed, 493 insertions(+), 412 deletions(-) create mode 100644 be/src/runtime/memory/lru_cache_value_base.h diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index daffc4585831ad..7fe86fedde3400 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -145,7 +145,14 @@ CloudTabletMgr::~CloudTabletMgr() = default; Result> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` - struct Value { + class Value : public LRUCacheValueBase { + public: + Value(const std::shared_ptr& tablet, TabletMap& tablet_map) + : LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TABLET_CACHE), + tablet(tablet), + tablet_map(tablet_map) {} + ~Value() override { tablet_map.erase(tablet.get()); } + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` // only requires a reference. std::shared_ptr tablet; @@ -154,10 +161,9 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i auto tablet_id_str = std::to_string(tablet_id); CacheKey key(tablet_id_str); - auto* cache = _cache->cache(); - auto* handle = cache->lookup(key); + auto* handle = _cache->lookup(key); if (handle == nullptr) { - auto load_tablet = [this, cache, &key, + auto load_tablet = [this, &key, warmup_data](int64_t tablet_id) -> std::shared_ptr { TabletMetaSharedPtr tablet_meta; auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); @@ -167,10 +173,7 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i } auto tablet = std::make_shared(_engine, std::move(tablet_meta)); - auto value = std::make_unique(Value { - .tablet = tablet, - .tablet_map = *_tablet_map, - }); + auto value = std::make_unique(tablet, *_tablet_map); // MUST sync stats to let compaction scheduler work correctly st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); if (!st.ok()) { @@ -178,17 +181,10 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i return nullptr; } - auto deleter = [](const CacheKey& key, void* value) { - auto* value1 = reinterpret_cast(value); - // tablet has been evicted, release it from `tablet_map` - value1->tablet_map.erase(value1->tablet.get()); - delete value1; - }; - - auto* handle = cache->insert(key, value.release(), 1, deleter, CachePriority::NORMAL, - sizeof(CloudTablet)); + auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet), + CachePriority::NORMAL); auto ret = std::shared_ptr( - tablet.get(), [cache, handle](...) { cache->release(handle); }); + tablet.get(), [this, handle](...) { _cache->release(handle); }); _tablet_map->put(std::move(tablet)); return ret; }; @@ -200,16 +196,16 @@ Result> CloudTabletMgr::get_tablet(int64_t tablet_i return tablet; } - CloudTablet* tablet_raw_ptr = reinterpret_cast(cache->value(handle))->tablet.get(); + CloudTablet* tablet_raw_ptr = reinterpret_cast(_cache->value(handle))->tablet.get(); auto tablet = std::shared_ptr(tablet_raw_ptr, - [cache, handle](...) { cache->release(handle); }); + [this, handle](...) { _cache->release(handle); }); return tablet; } void CloudTabletMgr::erase_tablet(int64_t tablet_id) { auto tablet_id_str = std::to_string(tablet_id); CacheKey key(tablet_id_str.data(), tablet_id_str.size()); - _cache->cache()->erase(key); + _cache->erase(key); } void CloudTabletMgr::vacuum_stale_rowsets() { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 08bc035770a398..6f0d90b37bbca0 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -69,17 +69,16 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info( } std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id); CacheKey key(key_str); - Cache::Handle* handle = cache()->lookup(key); + Cache::Handle* handle = lookup(key); DeleteBitmapCacheValue* val = - handle == nullptr ? nullptr - : reinterpret_cast(cache()->value(handle)); + handle == nullptr ? nullptr : reinterpret_cast(value(handle)); if (val) { *delete_bitmap = val->delete_bitmap; *rowset_ids = val->rowset_ids; // must call release handle to reduce the reference count, // otherwise there will be memory leak - cache()->release(handle); + release(handle); } else { LOG_INFO("cache missed when get delete bitmap") .tag("txn_id", transaction_id) @@ -111,17 +110,14 @@ void CloudTxnDeleteBitmapCache::set_tablet_txn_info( CacheKey key(key_str); auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids); - auto deleter = [](const CacheKey&, void* value) { - delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim - }; size_t charge = sizeof(DeleteBitmapCacheValue); for (auto& [k, v] : val->delete_bitmap->delete_bitmap) { charge += v.getSizeInBytes(); } - auto handle = cache()->insert(key, val, charge, deleter, CachePriority::NORMAL); + auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL); // must call release handle to reduce the reference count, // otherwise there will be memory leak - cache()->release(handle); + release(handle); LOG_INFO("set txn related delete bitmap") .tag("txn_id", transaction_id) .tag("expiration", txn_expiration) @@ -137,17 +133,14 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio CacheKey key(key_str); auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids); - auto deleter = [](const CacheKey&, void* value) { - delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim - }; size_t charge = sizeof(DeleteBitmapCacheValue); for (auto& [k, v] : val->delete_bitmap->delete_bitmap) { charge += v.getSizeInBytes(); } - auto handle = cache()->insert(key, val, charge, deleter, CachePriority::NORMAL); + auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL); // must call release handle to reduce the reference count, // otherwise there will be memory leak - cache()->release(handle); + release(handle); LOG_INFO("update txn related delete bitmap") .tag("txn_id", transaction_id) .tag("tablt_id", tablet_id) @@ -174,7 +167,7 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() { std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" + std::to_string(txn_iter->first.tablet_id); // Cache key container CacheKey cache_key(key_str); - cache()->erase(cache_key); + erase(cache_key); _txn_map.erase(iter->second); } _expiration_txn.erase(iter); diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index c84e19a765f2d9..3b1d1d1d85760d 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -56,13 +56,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy { private: void _clean_thread_callback(); - struct DeleteBitmapCacheValue { + class DeleteBitmapCacheValue : public LRUCacheValueBase { + public: DeleteBitmapPtr delete_bitmap; // records rowsets calc in commit txn RowsetIdUnorderedSet rowset_ids; DeleteBitmapCacheValue(DeleteBitmapPtr delete_bitmap_, const RowsetIdUnorderedSet& ids_) - : delete_bitmap(std::move(delete_bitmap_)), rowset_ids(ids_) {} + : LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE), + delete_bitmap(std::move(delete_bitmap_)), + rowset_ids(ids_) {} }; struct TxnKey { diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 95114ec54c9bc1..031082f6da8902 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -12,7 +12,6 @@ #include #include "gutil/bits.h" -#include "runtime/thread_context.h" #include "util/doris_metrics.h" #include "util/time.h" @@ -350,34 +349,23 @@ bool LRUCache::_check_element_count_limit() { } Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - MemTrackerLimiter* tracker, CachePriority priority, size_t bytes) { + CachePriority priority) { size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); - LRUHandle* e = reinterpret_cast(malloc(handle_size)); + auto* e = reinterpret_cast(malloc(handle_size)); e->value = value; - e->deleter = deleter; e->charge = charge; e->key_length = key.size(); // if LRUCacheType::NUMBER, charge not add handle_size, // because charge at this time is no longer the memory size, but an weight. e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : charge); - DCHECK(_type == LRUCacheType::SIZE || bytes != -1) << " _type " << _type; - // if LRUCacheType::NUMBER and bytes equals 0, such as some caches cannot accurately track memory size. - // cache mem tracker value divided by handle_size(106) will get the number of cache entries. - e->bytes = (_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes); e->hash = hash; e->refs = 2; // one for the returned handle, one for LRUCache. e->next = e->prev = nullptr; e->in_cache = true; e->priority = priority; - e->mem_tracker = tracker; e->type = _type; memcpy(e->key_data, key.data(), key.size()); e->last_visit_time = UnixMillis(); - // The memory of the parameter value should be recorded in the tls mem tracker, - // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. - THREAD_MEM_TRACKER_TRANSFER_TO(e->bytes, tracker); - DorisMetrics::instance()->lru_cache_memory_bytes->increment(e->bytes); LRUHandle* to_remove_head = nullptr; { std::lock_guard l(_mutex); @@ -464,7 +452,7 @@ PrunedInfo LRUCache::prune() { int64_t pruned_size = 0; while (to_remove_head != nullptr) { ++pruned_count; - pruned_size += to_remove_head->bytes; + pruned_size += to_remove_head->total_size; LRUHandle* next = to_remove_head->next; to_remove_head->free(); to_remove_head = next; @@ -506,7 +494,7 @@ PrunedInfo LRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) { int64_t pruned_size = 0; while (to_remove_head != nullptr) { ++pruned_count; - pruned_size += to_remove_head->bytes; + pruned_size += to_remove_head->total_size; LRUHandle* next = to_remove_head->next; to_remove_head->free(); to_remove_head = next; @@ -534,9 +522,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, _shards(nullptr), _last_id(1), _total_capacity(total_capacity) { - _mem_tracker = std::make_unique( - MemTrackerLimiter::Type::GLOBAL, - fmt::format("{}[{}]", name, lru_cache_type_string(type))); CHECK(num_shards > 0) << "num_shards cannot be 0"; CHECK_EQ((num_shards & (num_shards - 1)), 0) << "num_shards should be power of two, but got " << num_shards; @@ -594,11 +579,9 @@ ShardedLRUCache::~ShardedLRUCache() { } Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - CachePriority priority, size_t bytes) { + CachePriority priority) { const uint32_t hash = _hash_slice(key); - return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, _mem_tracker.get(), - priority, bytes); + return _shards[_shard(hash)]->insert(key, hash, value, charge, priority); } Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { @@ -620,11 +603,6 @@ void* ShardedLRUCache::value(Handle* handle) { return reinterpret_cast(handle)->value; } -Slice ShardedLRUCache::value_slice(Handle* handle) { - auto lru_handle = reinterpret_cast(handle); - return Slice((char*)lru_handle->value, lru_handle->charge); -} - uint64_t ShardedLRUCache::new_id() { return _last_id.fetch_add(1, std::memory_order_relaxed); } @@ -649,10 +627,6 @@ PrunedInfo ShardedLRUCache::prune_if(CachePrunePredicate pred, bool lazy_mode) { return pruned_info; } -int64_t ShardedLRUCache::mem_consumption() { - return _mem_tracker->consumption(); -} - int64_t ShardedLRUCache::get_usage() { size_t total_usage = 0; for (int i = 0; i < _num_shards; i++) { @@ -683,16 +657,13 @@ void ShardedLRUCache::update_cache_metrics() const { } Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - CachePriority priority, size_t bytes) { + CachePriority priority) { size_t handle_size = sizeof(LRUHandle); auto* e = reinterpret_cast(malloc(handle_size)); e->value = value; - e->deleter = deleter; e->charge = charge; e->key_length = 0; e->total_size = 0; - e->bytes = 0; e->hash = 0; e->refs = 1; // only one for the returned handle e->next = e->prev = nullptr; @@ -712,9 +683,4 @@ void* DummyLRUCache::value(Handle* handle) { return reinterpret_cast(handle)->value; } -Slice DummyLRUCache::value_slice(Handle* handle) { - auto* lru_handle = reinterpret_cast(handle); - return Slice((char*)lru_handle->value, lru_handle->charge); -} - } // namespace doris diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index d5faba6a2f679e..5067692104401e 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -19,11 +20,9 @@ #include #include -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/thread_context.h" +#include "runtime/memory/lru_cache_value_base.h" #include "util/doris_metrics.h" #include "util/metrics.h" -#include "util/slice.h" namespace doris { @@ -182,8 +181,7 @@ class Cache { // When the inserted entry is no longer needed, the key and // value will be passed to "deleter". virtual Handle* insert(const CacheKey& key, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) = 0; + CachePriority priority = CachePriority::NORMAL) = 0; // If the cache has no mapping for "key", returns nullptr. // @@ -203,10 +201,6 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual void* value(Handle* handle) = 0; - // Return the value in Slice format encapsulated in the given handle - // returned by a successful lookup() - virtual Slice value_slice(Handle* handle) = 0; - // If the cache contains entry for key, erase it. Note that the // underlying entry will be kept around until all existing handles // to it have been released. @@ -231,8 +225,6 @@ class Cache { // may hold lock for a long time to execute predicate. virtual PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) { return {0, 0}; } - virtual int64_t mem_consumption() = 0; - virtual int64_t get_usage() = 0; virtual size_t get_total_capacity() = 0; @@ -243,21 +235,20 @@ class Cache { // An entry is a variable length heap-allocated structure. Entries // are kept in a circular doubly linked list ordered by access time. +// Note: member variables can only be POD types and raw pointer, +// cannot be class objects or smart pointers, because LRUHandle will be created using malloc. struct LRUHandle { void* value = nullptr; - void (*deleter)(const CacheKey&, void* value); struct LRUHandle* next_hash = nullptr; // next entry in hash table struct LRUHandle* next = nullptr; // next entry in lru list struct LRUHandle* prev = nullptr; // previous entry in lru list size_t charge; size_t key_length; size_t total_size; // Entry charge, used to limit cache capacity, LRUCacheType::SIZE including key length. - size_t bytes; // Used by LRUCacheType::NUMBER, LRUCacheType::SIZE equal to total_size. bool in_cache; // Whether entry is in the cache. uint32_t refs; uint32_t hash; // Hash of key(); used for fast sharding and comparisons CachePriority priority = CachePriority::NORMAL; - MemTrackerLimiter* mem_tracker; LRUCacheType type; int64_t last_visit_time; // Save the last visit time of this cache entry. char key_data[1]; // Beginning of key @@ -274,10 +265,8 @@ struct LRUHandle { } void free() { - (*deleter)(key(), value); - if (bytes != 0) { // DummyLRUCache bytes always equal to 0 - THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); - DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); + if (value != nullptr) { // value allows null pointer. + delete (LRUCacheValueBase*)value; } ::free(this); } @@ -346,9 +335,7 @@ class LRUCache { // Like Cache methods, but with an extra "hash" parameter. // Must call release on the returned handle pointer. Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - MemTrackerLimiter* tracker, - CachePriority priority = CachePriority::NORMAL, size_t bytes = -1); + CachePriority priority = CachePriority::NORMAL); Cache::Handle* lookup(const CacheKey& key, uint32_t hash); void release(Cache::Handle* handle); void erase(const CacheKey& key, uint32_t hash); @@ -406,18 +393,14 @@ class ShardedLRUCache : public Cache { public: virtual ~ShardedLRUCache(); virtual Handle* insert(const CacheKey& key, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL, - size_t bytes = -1) override; + CachePriority priority = CachePriority::NORMAL) override; virtual Handle* lookup(const CacheKey& key) override; virtual void release(Handle* handle) override; virtual void erase(const CacheKey& key) override; virtual void* value(Handle* handle) override; - Slice value_slice(Handle* handle) override; virtual uint64_t new_id() override; PrunedInfo prune() override; PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) override; - int64_t mem_consumption() override; int64_t get_usage() override; size_t get_total_capacity() override { return _total_capacity; }; @@ -434,17 +417,6 @@ class ShardedLRUCache : public Cache { void update_cache_metrics() const; - static std::string lru_cache_type_string(LRUCacheType type) { - switch (type) { - case LRUCacheType::SIZE: - return "size"; - case LRUCacheType::NUMBER: - return "number"; - default: - LOG(FATAL) << "not match type of lru cache:" << static_cast(type); - } - } - private: static uint32_t _hash_slice(const CacheKey& s); uint32_t _shard(uint32_t hash) { @@ -458,7 +430,6 @@ class ShardedLRUCache : public Cache { std::atomic _last_id; size_t _total_capacity; - std::unique_ptr _mem_tracker; std::shared_ptr _entity; IntGauge* cache_capacity = nullptr; IntGauge* cache_usage = nullptr; @@ -478,19 +449,16 @@ class DummyLRUCache : public Cache { public: // Must call release on the returned handle pointer. Handle* insert(const CacheKey& key, void* value, size_t charge, - void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) override; + CachePriority priority = CachePriority::NORMAL) override; Handle* lookup(const CacheKey& key) override { return nullptr; }; void release(Handle* handle) override; void erase(const CacheKey& key) override {}; void* value(Handle* handle) override; - Slice value_slice(Handle* handle) override; uint64_t new_id() override { return 0; }; PrunedInfo prune() override { return {0, 0}; }; PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) override { return {0, 0}; }; - int64_t mem_consumption() override { return 0; }; int64_t get_usage() override { return 0; }; size_t get_total_capacity() override { return 0; }; }; diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index db22457b329850..fc87b498ba4d1a 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -28,9 +28,8 @@ StoragePageCache* StoragePageCache::create_global_cache(size_t capacity, int32_t index_cache_percentage, int64_t pk_index_cache_capacity, uint32_t num_shards) { - StoragePageCache* res = new StoragePageCache(capacity, index_cache_percentage, - pk_index_cache_capacity, num_shards); - return res; + return new StoragePageCache(capacity, index_cache_percentage, pk_index_cache_capacity, + num_shards); } StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percentage, @@ -54,8 +53,8 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, segment_v2::PageTypePB page_type) { - auto cache = _get_page_cache(page_type); - auto lru_handle = cache->lookup(key.encode()); + auto* cache = _get_page_cache(page_type); + auto* lru_handle = cache->lookup(key.encode()); if (lru_handle == nullptr) { return false; } @@ -65,18 +64,13 @@ bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory) { - auto deleter = [](const doris::CacheKey& key, void* value) { - DataPage* cache_value = (DataPage*)value; - delete cache_value; - }; - CachePriority priority = CachePriority::NORMAL; if (in_memory) { priority = CachePriority::DURABLE; } - auto cache = _get_page_cache(page_type); - auto lru_handle = cache->insert(key.encode(), data, data->capacity(), deleter, priority); + auto* cache = _get_page_cache(page_type); + auto* lru_handle = cache->insert(key.encode(), data, data->capacity(), 0, priority); *handle = PageCacheHandle(cache, lru_handle); } diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 2ea7c9a674aeac..97ffe9fba521e2 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -28,6 +28,7 @@ #include "olap/lru_cache.h" #include "runtime/memory/lru_cache_policy.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/slice.h" #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" @@ -37,20 +38,23 @@ namespace doris { class PageCacheHandle; template -class PageBase : private TAllocator { +class PageBase : private TAllocator, public LRUCacheValueBase { public: - PageBase() : _data(nullptr), _size(0), _capacity(0) {} + PageBase() = default; - PageBase(size_t b) : _size(b), _capacity(b) { + PageBase(size_t b, const std::shared_ptr& mem_tracker) + : LRUCacheValueBase(mem_tracker), _size(b), _capacity(b) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } PageBase(const PageBase&) = delete; PageBase& operator=(const PageBase&) = delete; - ~PageBase() { + ~PageBase() override { if (_data != nullptr) { DCHECK(_capacity != 0 && _size != 0); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); TAllocator::free(_data, _capacity); } } @@ -67,7 +71,7 @@ class PageBase : private TAllocator { private: char* _data = nullptr; // Effective size, smaller than capacity, such as data page remove checksum suffix. - size_t _size; + size_t _size = 0; size_t _capacity = 0; }; @@ -157,6 +161,10 @@ class StoragePageCache { void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory = false); + std::shared_ptr mem_tracker(segment_v2::PageTypePB page_type) { + return _get_page_cache(page_type)->mem_tracker(); + } + private: StoragePageCache(); @@ -168,16 +176,16 @@ class StoragePageCache { // delete bitmap in unique key with mow std::unique_ptr _pk_index_page_cache; - Cache* _get_page_cache(segment_v2::PageTypePB page_type) { + LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { - return _data_page_cache->cache(); + return _data_page_cache.get(); } case segment_v2::INDEX_PAGE: { - return _index_page_cache->cache(); + return _index_page_cache.get(); } case segment_v2::PRIMARY_KEY_INDEX_PAGE: { - return _pk_index_page_cache->cache(); + return _pk_index_page_cache.get(); } default: LOG(FATAL) << "get error type page cache"; @@ -192,8 +200,9 @@ class StoragePageCache { // class will release the cache entry when it is destroyed. class PageCacheHandle { public: - PageCacheHandle() {} - PageCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} + PageCacheHandle() = default; + PageCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) + : _cache(cache), _handle(handle) {} ~PageCacheHandle() { if (_handle != nullptr) { _cache->release(_handle); @@ -212,14 +221,14 @@ class PageCacheHandle { return *this; } - Cache* cache() const { return _cache; } + LRUCachePolicy* cache() const { return _cache; } Slice data() const { - DataPage* cache_value = (DataPage*)_cache->value(_handle); - return Slice(cache_value->data(), cache_value->size()); + auto* cache_value = (DataPage*)_cache->value(_handle); + return {cache_value->data(), cache_value->size()}; } private: - Cache* _cache = nullptr; + LRUCachePolicy* _cache = nullptr; Cache::Handle* _handle = nullptr; // Don't allow copy and assign diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h b/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h index 7ec7bed2237e39..2ab1b278c539c1 100644 --- a/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h +++ b/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h @@ -38,8 +38,8 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder { * @param size_of_tail including size of footer and null map * @return Status */ - virtual Status decode(std::unique_ptr* page, Slice* page_slice, - size_t size_of_tail) override { + Status decode(std::unique_ptr* page, Slice* page_slice, size_t size_of_tail, + const std::shared_ptr& mem_tracker) override { size_t num_elements, compressed_size, num_element_after_padding; int size_of_element; @@ -66,7 +66,8 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder { Slice decoded_slice; decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE + num_element_after_padding * size_of_element + size_of_tail; - std::unique_ptr decoded_page = std::make_unique(decoded_slice.size); + std::unique_ptr decoded_page = + std::make_unique(decoded_slice.size, mem_tracker); decoded_slice.data = decoded_page->data(); if constexpr (USED_IN_DICT_ENCODING) { diff --git a/be/src/olap/rowset/segment_v2/encoding_info.h b/be/src/olap/rowset/segment_v2/encoding_info.h index c6f065a28ed150..d9207baa25ec5a 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.h +++ b/be/src/olap/rowset/segment_v2/encoding_info.h @@ -42,8 +42,8 @@ enum EncodingTypePB : int; // For better performance, some encodings (like BitShuffle) need to be decoded before being added to the PageCache. class DataPagePreDecoder { public: - virtual Status decode(std::unique_ptr* page, Slice* page_slice, - size_t size_of_tail) = 0; + virtual Status decode(std::unique_ptr* page, Slice* page_slice, size_t size_of_tail, + const std::shared_ptr& mem_tracker) = 0; virtual ~DataPagePreDecoder() = default; }; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index 8359a2f896f01e..fe456d53eae380 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -80,21 +80,21 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) { InvertedIndexSearcherCache::CacheKey cache_key(index_file_path); - _policy->cache()->erase(cache_key.index_file_path); + _policy->erase(cache_key.index_file_path); return Status::OK(); } int64_t InvertedIndexSearcherCache::mem_consumption() { - return _policy->cache()->mem_consumption(); + return _policy->mem_consumption(); } bool InvertedIndexSearcherCache::lookup(const InvertedIndexSearcherCache::CacheKey& key, InvertedIndexCacheHandle* handle) { - auto* lru_handle = _policy->cache()->lookup(key.index_file_path); + auto* lru_handle = _policy->lookup(key.index_file_path); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle); + *handle = InvertedIndexCacheHandle(_policy.get(), lru_handle); return true; } @@ -107,18 +107,13 @@ void InvertedIndexSearcherCache::insert(const InvertedIndexSearcherCache::CacheK void InvertedIndexSearcherCache::insert(const InvertedIndexSearcherCache::CacheKey& cache_key, CacheValue* cache_value, InvertedIndexCacheHandle* handle) { auto* lru_handle = _insert(cache_key, cache_value); - *handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle); + *handle = InvertedIndexCacheHandle(_policy.get(), lru_handle); } Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCache::CacheKey& key, CacheValue* value) { - auto deleter = [](const doris::CacheKey& key, void* value) { - auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value; - delete cache_value; - }; - - Cache::Handle* lru_handle = _policy->cache()->insert(key.index_file_path, value, value->size, - deleter, CachePriority::NORMAL); + Cache::Handle* lru_handle = _policy->insert(key.index_file_path, value, value->size, + value->size, CachePriority::NORMAL); return lru_handle; } @@ -126,20 +121,16 @@ bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCach if (key.encode().empty()) { return false; } - auto* lru_handle = cache()->lookup(key.encode()); + auto* lru_handle = LRUCachePolicy::lookup(key.encode()); if (lru_handle == nullptr) { return false; } - *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); + *handle = InvertedIndexQueryCacheHandle(this, lru_handle); return true; } void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr bitmap, InvertedIndexQueryCacheHandle* handle) { - auto deleter = [](const doris::CacheKey& key, void* value) { - delete (InvertedIndexQueryCache::CacheValue*)value; - }; - std::unique_ptr cache_value_ptr = std::make_unique(); cache_value_ptr->bitmap = bitmap; @@ -147,13 +138,14 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrinsert(key.encode(), (void*)cache_value_ptr.release(), - bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL); - *handle = InvertedIndexQueryCacheHandle(cache(), lru_handle); + auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), bitmap->getSizeInBytes(), + CachePriority::NORMAL); + *handle = InvertedIndexQueryCacheHandle(this, lru_handle); } int64_t InvertedIndexQueryCache::mem_consumption() { - return cache()->mem_consumption(); + return LRUCachePolicy::mem_consumption(); } } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index cbeb8adf6ef00c..3292b10e25bd37 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -81,14 +81,16 @@ class InvertedIndexSearcherCache { // The cache value of index_searcher lru cache. // Holding an opened index_searcher. - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: IndexSearcherPtr index_searcher; size_t size = 0; int64_t last_visit_time; - CacheValue() = default; + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE) {} explicit CacheValue(IndexSearcherPtr searcher, size_t mem_size, int64_t visit_time) - : index_searcher(std::move(searcher)) { + : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE), + index_searcher(std::move(searcher)) { size = mem_size; last_visit_time = visit_time; } @@ -119,7 +121,7 @@ class InvertedIndexSearcherCache { // function `erase` called after compaction remove segment Status erase(const std::string& index_file_path); - void release(Cache::Handle* handle) { _policy->cache()->release(handle); } + void release(Cache::Handle* handle) { _policy->release(handle); } int64_t mem_consumption(); @@ -162,7 +164,7 @@ using IndexCacheValuePtr = std::unique_ptr bitmap; }; @@ -267,7 +272,7 @@ class InvertedIndexQueryCacheHandle { public: InvertedIndexQueryCacheHandle() = default; - InvertedIndexQueryCacheHandle(Cache* cache, Cache::Handle* handle) + InvertedIndexQueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} ~InvertedIndexQueryCacheHandle() { @@ -288,8 +293,7 @@ class InvertedIndexQueryCacheHandle { return *this; } - Cache* cache() const { return _cache; } - Slice data() const { return _cache->value_slice(_handle); } + LRUCachePolicy* cache() const { return _cache; } std::shared_ptr get_bitmap() const { if (!_cache) { @@ -299,7 +303,7 @@ class InvertedIndexQueryCacheHandle { } private: - Cache* _cache = nullptr; + LRUCachePolicy* _cache = nullptr; Cache::Handle* _handle = nullptr; // Don't allow copy and assign diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index dac9012abcd377..cf6e0541612739 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -143,8 +143,15 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* opts.file_reader->path().native()); } + std::shared_ptr page_mem_tracker; + if (opts.use_page_cache && cache) { + page_mem_tracker = cache->mem_tracker(opts.type); + } else { + page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + } + // hold compressed page at first, reset to decompressed page later - std::unique_ptr page = std::make_unique(page_size); + std::unique_ptr page = std::make_unique(page_size, page_mem_tracker); Slice page_slice(page->data(), page_size); { SCOPED_RAW_TIMER(&opts.stats->io_ns); @@ -182,8 +189,8 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* opts.file_reader->path().native()); } SCOPED_RAW_TIMER(&opts.stats->decompress_ns); - std::unique_ptr decompressed_page = - std::make_unique(footer->uncompressed_size() + footer_size + 4); + std::unique_ptr decompressed_page = std::make_unique( + footer->uncompressed_size() + footer_size + 4, page_mem_tracker); // decompress page body Slice compressed_body(page_slice.data, body_size); @@ -210,8 +217,8 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* auto* pre_decoder = opts.encoding_info->get_data_page_pre_decoder(); if (pre_decoder) { RETURN_IF_ERROR(pre_decoder->decode( - &page, &page_slice, - footer->data_page_footer().nullmap_size() + footer_size + 4)); + &page, &page_slice, footer->data_page_footer().nullmap_size() + footer_size + 4, + page_mem_tracker)); } } diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index 326d8875f81853..b94fcf8ccdbb2f 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -65,10 +65,10 @@ class SchemaCache : public LRUCachePolicy { if (!instance() || schema_key.empty()) { return {}; } - auto lru_handle = cache()->lookup(schema_key); + auto* lru_handle = lookup(schema_key); if (lru_handle) { - Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)cache()->value(lru_handle); + Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); + auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); VLOG_DEBUG << "use cache schema"; if constexpr (std::is_same_v) { return value->tablet_schema; @@ -86,7 +86,7 @@ class SchemaCache : public LRUCachePolicy { if (!instance() || key.empty()) { return; } - CacheValue* value = new CacheValue; + auto* value = new CacheValue; if constexpr (std::is_same_v) { value->type = Type::TABLET_SCHEMA; value->tablet_schema = schema; @@ -94,19 +94,18 @@ class SchemaCache : public LRUCachePolicy { value->type = Type::SCHEMA; value->schema = schema; } - auto deleter = [](const doris::CacheKey& key, void* value) { - CacheValue* cache_value = (CacheValue*)value; - delete cache_value; - }; - auto lru_handle = - cache()->insert(key, value, 1, deleter, CachePriority::NORMAL, schema->mem_size()); - cache()->release(lru_handle); + + auto lru_handle = insert(key, value, 1, schema->mem_size(), CachePriority::NORMAL); + release(lru_handle); } // Try to prune the cache if expired. Status prune(); - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::SCHEMA_CACHE) {} + Type type; // either tablet_schema or schema TabletSchemaSPtr tablet_schema = nullptr; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 0d068bfcbe6271..f6ca7b72a2a44a 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -29,29 +29,23 @@ SegmentLoader* SegmentLoader::instance() { } bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { - auto lru_handle = cache()->lookup(key.encode()); + auto* lru_handle = LRUCachePolicy::lookup(key.encode()); if (lru_handle == nullptr) { return false; } - handle->push_segment(cache(), lru_handle); + handle->push_segment(this, lru_handle); return true; } void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, SegmentCacheHandle* handle) { - auto deleter = [](const doris::CacheKey& key, void* value) { - SegmentCache::CacheValue* cache_value = (SegmentCache::CacheValue*)value; - cache_value->segment.reset(); - delete cache_value; - }; - - auto lru_handle = cache()->insert(key.encode(), &value, 1, deleter, CachePriority::NORMAL, - value.segment->meta_mem_usage()); - handle->push_segment(cache(), lru_handle); + auto* lru_handle = LRUCachePolicy::insert( + key.encode(), &value, 1, value.segment->meta_mem_usage(), CachePriority::NORMAL); + handle->push_segment(this, lru_handle); } void SegmentCache::erase(const SegmentCache::CacheKey& key) { - cache()->erase(key.encode()); + LRUCachePolicy::erase(key.encode()); } Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, @@ -68,7 +62,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, RETURN_IF_ERROR(rowset->load_segment(i, &segment)); if (use_cache && !config::disable_segment_cache) { // memory of SegmentCache::CacheValue will be handled by SegmentCache - SegmentCache::CacheValue* cache_value = new SegmentCache::CacheValue(); + auto* cache_value = new SegmentCache::CacheValue(); cache_value->segment = std::move(segment); _segment_cache->insert(cache_key, *cache_value, cache_handle); } else { diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index d0e2ae81509edd..d952fd522d0b73 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -72,7 +72,11 @@ class SegmentCache : public LRUCachePolicy { // The cache value of segment lru cache. // Holding all opened segments of a rowset. - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::SEGMENT_CACHE) {} + ~CacheValue() override { segment.reset(); } + segment_v2::SegmentSharedPtr segment; }; @@ -131,7 +135,7 @@ class SegmentCacheHandle { SegmentCacheHandle() = default; ~SegmentCacheHandle() = default; - void push_segment(Cache* cache, Cache::Handle* handle) { + void push_segment(LRUCachePolicy* cache, Cache::Handle* handle) { segments.push_back(((SegmentCache::CacheValue*)cache->value(handle))->segment); cache->release(handle); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0a94c264110c4d..4caa1721f9f2b1 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1487,10 +1487,10 @@ void StorageEngine::_decrease_low_priority_task_nums(DataDir* dir) { } int CreateTabletIdxCache::get_index(const std::string& key) { - auto lru_handle = cache()->lookup(key); + auto* lru_handle = lookup(key); if (lru_handle) { - Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)cache()->value(lru_handle); + Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); + auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx; return value->idx; } @@ -1499,14 +1499,10 @@ int CreateTabletIdxCache::get_index(const std::string& key) { void CreateTabletIdxCache::set_index(const std::string& key, int next_idx) { assert(next_idx >= 0); - CacheValue* value = new CacheValue; + auto* value = new CacheValue; value->idx = next_idx; - auto deleter = [](const doris::CacheKey& key, void* value) { - CacheValue* cache_value = (CacheValue*)value; - delete cache_value; - }; - auto lru_handle = cache()->insert(key, value, 1, deleter, CachePriority::NORMAL, sizeof(int)); - cache()->release(lru_handle); + auto* lru_handle = insert(key, value, 1, sizeof(int), CachePriority::NORMAL); + release(lru_handle); } } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index b0e9ed6000e827..30845e4e66a4e1 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -527,7 +527,10 @@ class CreateTabletIdxCache : public LRUCachePolicy { void set_index(const std::string& key, int next_idx); - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE) {} + int idx = 0; }; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 11357264df17ea..dc90093a21320b 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1108,11 +1108,8 @@ std::shared_ptr DeleteBitmap::get_agg(const BitmapKey& bmk) co val->bitmap |= bm; } } - static auto deleter = [](const CacheKey& key, void* value) { - delete (AggCache::Value*)value; // Just delete to reclaim - }; size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value); - handle = _agg_cache->repr()->insert(key, val, charge, deleter, CachePriority::NORMAL); + handle = _agg_cache->repr()->insert(key, val, charge, charge, CachePriority::NORMAL); } // It is natural for the cache to reclaim the underlying memory diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index e380ebefe9db9a..f5b4d6625b1ec2 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -502,7 +502,10 @@ class DeleteBitmap { class AggCache { public: - struct Value { + class Value : public LRUCacheValueBase { + public: + Value() : LRUCacheValueBase(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE) {} + roaring::Roaring bitmap; }; @@ -517,7 +520,7 @@ class DeleteBitmap { } } - static Cache* repr() { return s_repr.load(std::memory_order_acquire)->cache(); } + static LRUCachePolicy* repr() { return s_repr.load(std::memory_order_acquire); } static std::atomic s_repr; }; diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 6cadee3a00101f..e339c947bb97a4 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -28,10 +28,10 @@ bvar::Adder g_tablet_schema_cache_columns_count("tablet_schema_cache_co namespace doris { std::pair TabletSchemaCache::insert(const std::string& key) { - auto* lru_handle = cache()->lookup(key); + auto* lru_handle = lookup(key); TabletSchemaSPtr tablet_schema_ptr; if (lru_handle) { - auto* value = (CacheValue*)cache()->value(lru_handle); + auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); tablet_schema_ptr = value->tablet_schema; } else { auto* value = new CacheValue; @@ -40,14 +40,8 @@ std::pair TabletSchemaCache::insert(const std: pb.ParseFromString(key); tablet_schema_ptr->init_from_pb(pb); value->tablet_schema = tablet_schema_ptr; - auto deleter = [](const doris::CacheKey& key, void* value) { - auto* cache_value = (CacheValue*)value; - g_tablet_schema_cache_count << -1; - g_tablet_schema_cache_columns_count << -cache_value->tablet_schema->num_columns(); - delete cache_value; - }; - lru_handle = cache()->insert(key, value, tablet_schema_ptr->num_columns(), deleter, - CachePriority::NORMAL, 0); + lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0, + CachePriority::NORMAL); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); } @@ -56,7 +50,12 @@ std::pair TabletSchemaCache::insert(const std: } void TabletSchemaCache::release(Cache::Handle* lru_handle) { - cache()->release(lru_handle); + LRUCachePolicy::release(lru_handle); +} + +TabletSchemaCache::CacheValue::~CacheValue() { + g_tablet_schema_cache_count << -1; + g_tablet_schema_cache_columns_count << -tablet_schema->num_columns(); } } // namespace doris diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 6eb93105f830e3..447be401eca92c 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -43,7 +43,11 @@ class TabletSchemaCache : public LRUCachePolicy { void release(Cache::Handle*); private: - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::TABLET_SCHEMA_CACHE) {} + ~CacheValue() override; + TabletSchemaSPtr tablet_schema; }; }; diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 739e85dd0fdc90..75d38ea735aa02 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -863,12 +863,12 @@ int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); CacheKey cache_key((const char*)&key, sizeof(key)); - auto* handle = _tablet_version_cache->cache()->lookup(cache_key); + auto* handle = _tablet_version_cache->lookup(cache_key); if (handle == nullptr) { return -1; } - int64_t res = *(int64_t*)_tablet_version_cache->cache()->value(handle); - _tablet_version_cache->cache()->release(handle); + int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value; + _tablet_version_cache->release(handle); return res; } @@ -878,16 +878,11 @@ void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, i memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); CacheKey cache_key((const char*)&key, sizeof(key)); - int64_t* value = new int64_t; - *value = txn_id; - auto deleter = [](const doris::CacheKey& key, void* value) { - int64_t* cache_value = (int64_t*)value; - delete cache_value; - }; - - auto* handle = _tablet_version_cache->cache()->insert(cache_key, value, 1, deleter, - CachePriority::NORMAL, sizeof(txn_id)); - _tablet_version_cache->cache()->release(handle); + auto* value = new CacheValue; + value->value = txn_id; + auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id), + CachePriority::NORMAL); + _tablet_version_cache->release(handle); } TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 67a62d557aafdc..724255bccafa53 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -126,6 +126,13 @@ class TxnManager { delete[] _txn_tablet_delta_writer_map_locks; } + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::TABLET_VERSION_CACHE) {} + + int64_t value; + }; + // add a txn to manager // partition id is useful in publish version stage because version is associated with partition Status prepare_txn(TPartitionId partition_id, const Tablet& tablet, diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 4d98276f0d2626..4b0cc32f9c99ac 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -114,8 +114,6 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { return Status::OK(); } -static void dummy_deleter(const CacheKey& key, void* value) {} - Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, bool& is_eof, const UniqueId& load_id, const PTabletWriterAddBlockRequest& request) { @@ -123,10 +121,10 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, std::lock_guard l(_lock); auto it = _load_channels.find(load_id); if (it == _load_channels.end()) { - auto handle = _last_success_channels->cache()->lookup(load_id.to_string()); + auto* handle = _last_success_channels->lookup(load_id.to_string()); // success only when eos be true if (handle != nullptr) { - _last_success_channels->cache()->release(handle); + _last_success_channels->release(handle); if (request.has_eos() && request.eos()) { is_eof = true; return Status::OK(); @@ -182,9 +180,8 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { if (_load_channels.find(load_id) != _load_channels.end()) { _load_channels.erase(load_id); } - auto handle = _last_success_channels->cache()->insert(load_id.to_string(), nullptr, 1, - dummy_deleter); - _last_success_channels->cache()->release(handle); + auto* handle = _last_success_channels->insert(load_id.to_string(), nullptr, 1, 1); + _last_success_channels->release(handle); } VLOG_CRITICAL << "removed load channel " << load_id; } diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp index 4b242f5d3aedbf..d17954ffe8bbc0 100644 --- a/be/src/runtime/memory/cache_manager.cpp +++ b/be/src/runtime/memory/cache_manager.cpp @@ -26,7 +26,8 @@ int64_t CacheManager::for_each_cache_prune_stale_wrap( std::function func, RuntimeProfile* profile) { int64_t freed_size = 0; std::lock_guard l(_caches_lock); - for (auto* cache_policy : _caches) { + for (const auto& pair : _caches) { + auto* cache_policy = pair.second; if (!cache_policy->enable_prune()) { continue; } @@ -57,11 +58,7 @@ int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) { void CacheManager::clear_once(CachePolicy::CacheType type) { std::lock_guard l(_caches_lock); - for (auto* cache_policy : _caches) { - if (cache_policy->type() == type) { - cache_policy->prune_all(true); // will print log - } - } + _caches[type]->prune_all(true); // will print log } } // namespace doris diff --git a/be/src/runtime/memory/cache_manager.h b/be/src/runtime/memory/cache_manager.h index d17e8eff9869f9..672c88c615868a 100644 --- a/be/src/runtime/memory/cache_manager.h +++ b/be/src/runtime/memory/cache_manager.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "runtime/exec_env.h" #include "runtime/memory/cache_policy.h" @@ -29,25 +30,35 @@ namespace doris { // Hold the list of all caches, for prune when memory not enough or timing. class CacheManager { public: - static CacheManager* create_global_instance() { - CacheManager* res = new CacheManager(); - return res; - } + static CacheManager* create_global_instance() { return new CacheManager(); } static CacheManager* instance() { return ExecEnv::GetInstance()->get_cache_manager(); } - std::list::iterator register_cache(CachePolicy* cache) { + void register_cache(CachePolicy* cache) { std::lock_guard l(_caches_lock); - return _caches.insert(_caches.end(), cache); + auto it = _caches.find(cache->type()); + if (it != _caches.end()) { +#ifdef BE_TEST + _caches.erase(it); +#else + LOG(FATAL) << "Repeat register cache " << CachePolicy::type_string(cache->type()); +#endif // BE_TEST + } + _caches.insert({cache->type(), cache}); } - void unregister_cache(std::list::iterator it) { + void unregister_cache(CachePolicy::CacheType type) { +#ifdef BE_TEST + return; +#endif // BE_TEST std::lock_guard l(_caches_lock); + auto it = _caches.find(type); if (it != _caches.end()) { _caches.erase(it); - it = _caches.end(); } } + CachePolicy* get_cache(CachePolicy::CacheType type) { return _caches[type]; } + int64_t for_each_cache_prune_stale_wrap(std::function func, RuntimeProfile* profile = nullptr); @@ -73,7 +84,7 @@ class CacheManager { private: std::mutex _caches_lock; - std::list _caches; + std::unordered_map _caches; int64_t _last_prune_stale_timestamp = 0; int64_t _last_prune_all_timestamp = 0; }; diff --git a/be/src/runtime/memory/cache_policy.cpp b/be/src/runtime/memory/cache_policy.cpp index 99af4857038613..4e50d64d88eed1 100644 --- a/be/src/runtime/memory/cache_policy.cpp +++ b/be/src/runtime/memory/cache_policy.cpp @@ -23,12 +23,12 @@ namespace doris { CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s, bool enable_prune) : _type(type), _stale_sweep_time_s(stale_sweep_time_s), _enable_prune(enable_prune) { - _it = CacheManager::instance()->register_cache(this); + CacheManager::instance()->register_cache(this); init_profile(); } CachePolicy::~CachePolicy() { - CacheManager::instance()->unregister_cache(_it); + CacheManager::instance()->unregister_cache(_type); } } // namespace doris diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index 538b30099f49b9..459e6423add2da 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -17,6 +17,7 @@ #pragma once +#include "runtime/memory/mem_tracker_limiter.h" #include "util/runtime_profile.h" namespace doris { @@ -99,6 +100,8 @@ class CachePolicy { virtual void prune_all(bool force) = 0; CacheType type() { return _type; } + std::shared_ptr mem_tracker() { return _mem_tracker; } + int64_t mem_consumption() { return _mem_tracker->consumption(); } bool enable_prune() const { return _enable_prune; } RuntimeProfile* profile() { return _profile.get(); } @@ -113,8 +116,13 @@ class CachePolicy { _cost_timer = ADD_TIMER(_profile, "CostTime"); } + void init_mem_tracker(const std::string& name) { + _mem_tracker = std::make_shared(MemTrackerLimiter::Type::GLOBAL, name); + } + CacheType _type; - std::list::iterator _it; + + std::shared_ptr _mem_tracker; std::unique_ptr _profile; RuntimeProfile::Counter* _prune_stale_number_counter = nullptr; diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 48b56e8b3b12d4..35cf767b84636f 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -19,8 +19,12 @@ #include +#include + #include "olap/lru_cache.h" #include "runtime/memory/cache_policy.h" +#include "runtime/memory/lru_cache_value_base.h" +#include "runtime/thread_context.h" #include "util/time.h" namespace doris { @@ -32,7 +36,7 @@ class LRUCachePolicy : public CachePolicy { uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, bool enable_prune = true) - : CachePolicy(type, stale_sweep_time_s, enable_prune) { + : CachePolicy(type, stale_sweep_time_s, enable_prune), _lru_cache_type(lru_cache_type) { if (check_capacity(capacity, num_shards)) { _cache = std::shared_ptr( new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, @@ -41,6 +45,8 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + init_mem_tracker( + fmt::format("{}[{}]", type_string(_type), lru_cache_type_string(_lru_cache_type))); } LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, @@ -48,7 +54,7 @@ class LRUCachePolicy : public CachePolicy { uint32_t element_count_capacity, CacheValueTimeExtractor cache_value_time_extractor, bool cache_value_check_timestamp, bool enable_prune = true) - : CachePolicy(type, stale_sweep_time_s, enable_prune) { + : CachePolicy(type, stale_sweep_time_s, enable_prune), _lru_cache_type(lru_cache_type) { if (check_capacity(capacity, num_shards)) { _cache = std::shared_ptr( new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, @@ -58,8 +64,12 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + init_mem_tracker( + fmt::format("{}[{}]", type_string(_type), lru_cache_type_string(_lru_cache_type))); } + ~LRUCachePolicy() override { _cache.reset(); } + bool check_capacity(size_t capacity, uint32_t num_shards) { if (capacity < num_shards) { LOG(INFO) << fmt::format( @@ -72,7 +82,43 @@ class LRUCachePolicy : public CachePolicy { return true; } - ~LRUCachePolicy() override = default; + static std::string lru_cache_type_string(LRUCacheType type) { + switch (type) { + case LRUCacheType::SIZE: + return "size"; + case LRUCacheType::NUMBER: + return "number"; + default: + LOG(FATAL) << "not match type of lru cache:" << static_cast(type); + } + } + + // Insert and cache value destroy will be manually consume tracking_bytes to mem tracker. + // If memory is allocated from Allocator, tracking_bytes will is 0, no longer manual tracking. + // If lru cache is LRUCacheType::SIZE, tracking_bytes will be equal to charge. + Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, + CachePriority priority = CachePriority::NORMAL) { + size_t bytes_with_handle = _get_bytes_with_handle(key, charge, tracking_bytes); + if (value != nullptr && tracking_bytes > 0) { + ((LRUCacheValueBase*)value)->mem_tracker()->cache_consume(bytes_with_handle); + ((LRUCacheValueBase*)value)->set_tracking_bytes(bytes_with_handle); + } + return _cache->insert(key, value, charge, priority); + } + + Cache::Handle* lookup(const CacheKey& key) { return _cache->lookup(key); } + + void release(Cache::Handle* handle) { _cache->release(handle); } + + void* value(Cache::Handle* handle) { return _cache->value(handle); } + + void erase(const CacheKey& key) { _cache->erase(key); } + + int64_t get_usage() { return _cache->get_usage(); } + + size_t get_total_capacity() { return _cache->get_total_capacity(); } + + uint64_t new_id() { return _cache->new_id(); }; // Try to prune the cache if expired. void prune_stale() override { @@ -81,7 +127,7 @@ class LRUCachePolicy : public CachePolicy { if (_stale_sweep_time_s <= 0 && _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { return; } - if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { + if (mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); SCOPED_TIMER(_cost_timer); const int64_t curtime = UnixMillis(); @@ -91,7 +137,7 @@ class LRUCachePolicy : public CachePolicy { }; LOG(INFO) << fmt::format("[MemoryGC] {} prune stale start, consumption {}", - type_string(_type), _cache->mem_consumption()); + type_string(_type), mem_consumption()); // Prune cache in lazy mode to save cpu and minimize the time holding write lock PrunedInfo pruned_info = _cache->prune_if(pred, true); COUNTER_SET(_freed_entrys_counter, pruned_info.pruned_count); @@ -105,7 +151,7 @@ class LRUCachePolicy : public CachePolicy { LOG(INFO) << fmt::format( "[MemoryGC] {} not need prune stale, consumption {} less than " "CACHE_MIN_FREE_SIZE {}", - type_string(_type), _cache->mem_consumption(), CACHE_MIN_FREE_SIZE); + type_string(_type), mem_consumption(), CACHE_MIN_FREE_SIZE); } } @@ -115,12 +161,11 @@ class LRUCachePolicy : public CachePolicy { if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { return; } - if ((force && _cache->mem_consumption() != 0) || - _cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { + if ((force && mem_consumption() != 0) || mem_consumption() > CACHE_MIN_FREE_SIZE) { COUNTER_SET(_cost_timer, (int64_t)0); SCOPED_TIMER(_cost_timer); LOG(INFO) << fmt::format("[MemoryGC] {} prune all start, consumption {}", - type_string(_type), _cache->mem_consumption()); + type_string(_type), mem_consumption()); PrunedInfo pruned_info = _cache->prune(); COUNTER_SET(_freed_entrys_counter, pruned_info.pruned_count); COUNTER_SET(_freed_memory_counter, pruned_info.pruned_size); @@ -133,16 +178,25 @@ class LRUCachePolicy : public CachePolicy { LOG(INFO) << fmt::format( "[MemoryGC] {} not need prune all, force is {}, consumption {}, " "CACHE_MIN_FREE_SIZE {}", - type_string(_type), force, _cache->mem_consumption(), CACHE_MIN_FREE_SIZE); + type_string(_type), force, mem_consumption(), CACHE_MIN_FREE_SIZE); } } +private: + // LRUCacheType::SIZE equal to total_size. + size_t _get_bytes_with_handle(const CacheKey& key, size_t charge, size_t bytes) { + size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); + DCHECK(_lru_cache_type == LRUCacheType::SIZE || bytes != -1) + << " _type " << type_string(_type); + // if LRUCacheType::NUMBER and bytes equals 0, such as some caches cannot accurately track memory size. + // cache mem tracker value and _usage divided by handle_size(106) will get the number of cache entries. + return _lru_cache_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes; + } + // if check_capacity failed, will return dummy lru cache, // compatible with ShardedLRUCache usage, but will not actually cache. - Cache* cache() const { return _cache.get(); } - -private: std::shared_ptr _cache; + LRUCacheType _lru_cache_type; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_value_base.h b/be/src/runtime/memory/lru_cache_value_base.h new file mode 100644 index 00000000000000..2bb06dfaad5fdf --- /dev/null +++ b/be/src/runtime/memory/lru_cache_value_base.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/memory/cache_manager.h" +#include "runtime/memory/mem_tracker_limiter.h" + +namespace doris { + +// Base of the lru cache value. +class LRUCacheValueBase { +public: + LRUCacheValueBase() = delete; + LRUCacheValueBase(CachePolicy::CacheType type) { + _mem_tracker = CacheManager::instance()->get_cache(type)->mem_tracker(); + } + LRUCacheValueBase(const std::shared_ptr& mem_tracker) + : _mem_tracker(mem_tracker) {} + + virtual ~LRUCacheValueBase() { + if (_tracking_bytes > 0) { + // value not alloc use Allocator + _mem_tracker->cache_consume(-_tracking_bytes); + } + } + + void set_tracking_bytes(size_t tracking_bytes) { this->_tracking_bytes = tracking_bytes; } + + std::shared_ptr mem_tracker() { return _mem_tracker; } + +protected: + size_t _tracking_bytes = 0; + std::shared_ptr _mem_tracker = nullptr; +}; + +} // namespace doris diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index a6ee35b1c4574f..21f82e42ec72a0 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -112,7 +112,7 @@ int64_t Reusable::mem_size() const { LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) { DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr); - LookupConnectionCache* res = new LookupConnectionCache(capacity); + auto* res = new LookupConnectionCache(capacity); return res; } @@ -124,7 +124,7 @@ RowCache::RowCache(int64_t capacity, int num_shards) // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr); - RowCache* res = new RowCache(capacity, num_shards); + auto* res = new RowCache(capacity, num_shards); return res; } @@ -134,29 +134,30 @@ RowCache* RowCache::instance() { bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { const std::string& encoded_key = key.encode(); - auto lru_handle = cache()->lookup(encoded_key); + auto* lru_handle = LRUCachePolicy::lookup(encoded_key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(cache(), lru_handle); + *handle = CacheHandle(this, lru_handle); return true; } void RowCache::insert(const RowCacheKey& key, const Slice& value) { - auto deleter = [](const doris::CacheKey& key, void* value) { free(value); }; char* cache_value = static_cast(malloc(value.size)); memcpy(cache_value, value.data, value.size); + auto* row_cache_value = new RowCacheValue; + row_cache_value->cache_value = cache_value; const std::string& encoded_key = key.encode(); - auto handle = - cache()->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, + CachePriority::NORMAL); // handle will released - auto tmp = CacheHandle {cache(), handle}; + auto tmp = CacheHandle {this, handle}; } void RowCache::erase(const RowCacheKey& key) { const std::string& encoded_key = key.encode(); - cache()->erase(encoded_key); + LRUCachePolicy::erase(encoded_key); } Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 4fc1471fbc3a18..58cf2736313a90 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -127,13 +127,21 @@ class RowCache : public LRUCachePolicy { } }; + class RowCacheValue : public LRUCacheValueBase { + public: + RowCacheValue() : LRUCacheValueBase(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE) {} + ~RowCacheValue() override { free(cache_value); } + char* cache_value; + }; + // A handle for RowCache entry. This class make it easy to handle // Cache entry. Users don't need to release the obtained cache entry. This // class will release the cache entry when it is destroyed. class CacheHandle { public: CacheHandle() = default; - CacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} + CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) + : _cache(cache), _handle(handle) {} ~CacheHandle() { if (_handle != nullptr) { _cache->release(_handle); @@ -153,11 +161,14 @@ class RowCache : public LRUCachePolicy { bool valid() { return _cache != nullptr && _handle != nullptr; } - Cache* cache() const { return _cache; } - Slice data() const { return _cache->value_slice(_handle); } + LRUCachePolicy* cache() const { return _cache; } + Slice data() const { + return {(char*)((RowCacheValue*)_cache->value(_handle))->cache_value, + reinterpret_cast(_handle)->charge}; + } private: - Cache* _cache = nullptr; + LRUCachePolicy* _cache = nullptr; Cache::Handle* _handle = nullptr; // Don't allow copy and assign @@ -206,7 +217,7 @@ class LookupConnectionCache : public LRUCachePolicy { LRUCacheType::SIZE, config::tablet_lookup_cache_stale_sweep_time_sec) { } - std::string encode_key(__int128_t cache_id) { + static std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; fmt::format_to(buffer, "{}", cache_id); return std::string(buffer.data(), buffer.size()); @@ -214,33 +225,31 @@ class LookupConnectionCache : public LRUCachePolicy { void add(__int128_t cache_id, std::shared_ptr item) { std::string key = encode_key(cache_id); - CacheValue* value = new CacheValue; + auto* value = new CacheValue; value->item = item; - auto deleter = [](const doris::CacheKey& key, void* value) { - CacheValue* cache_value = (CacheValue*)value; - delete cache_value; - }; LOG(INFO) << "Add item mem size " << item->mem_size() - << ", cache_capacity: " << cache()->get_total_capacity() - << ", cache_usage: " << cache()->get_usage() - << ", mem_consum: " << cache()->mem_consumption(); - auto lru_handle = - cache()->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); - cache()->release(lru_handle); + << ", cache_capacity: " << get_total_capacity() + << ", cache_usage: " << get_usage() << ", mem_consum: " << mem_consumption(); + auto* lru_handle = + insert(key, value, item->mem_size(), item->mem_size(), CachePriority::NORMAL); + release(lru_handle); } std::shared_ptr get(__int128_t cache_id) { std::string key = encode_key(cache_id); - auto lru_handle = cache()->lookup(key); + auto* lru_handle = lookup(key); if (lru_handle) { - Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)cache()->value(lru_handle); + Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); + auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle)); return value->item; } return nullptr; } - struct CacheValue { + class CacheValue : public LRUCacheValueBase { + public: + CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE) {} + std::shared_ptr item; }; }; diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index aceb4a7354cf62..d597d9aa392370 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -156,8 +156,6 @@ DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(query_cache_memory_total_byte, MetricUni DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(query_cache_sql_total_count, MetricUnit::NOUNIT); DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(query_cache_partition_total_count, MetricUnit::NOUNIT); -DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(lru_cache_memory_bytes, MetricUnit::BYTES); - DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(upload_total_byte, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_rowset_count, MetricUnit::ROWSETS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS); @@ -277,8 +275,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_sql_total_count); INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_partition_total_count); - INT_GAUGE_METRIC_REGISTER(_server_metric_entity, lru_cache_memory_bytes); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index e0aa2625cc83d4..01247964aa7775 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -190,8 +190,6 @@ class DorisMetrics { UIntGauge* query_cache_sql_total_count = nullptr; UIntGauge* query_cache_partition_total_count = nullptr; - IntGauge* lru_cache_memory_bytes = nullptr; - UIntGauge* scanner_thread_pool_queue_size = nullptr; UIntGauge* add_batch_task_queue_size = nullptr; UIntGauge* send_batch_thread_pool_thread_num = nullptr; diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index 4b61b245f2381a..e4d644842238ac 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -30,18 +30,18 @@ bool ObjLRUCache::lookup(const ObjKey& key, CacheHandle* handle) { if (!_enabled) { return false; } - auto lru_handle = cache()->lookup(key.key); + auto* lru_handle = LRUCachePolicy::lookup(key.key); if (!lru_handle) { // cache miss return false; } - *handle = CacheHandle(cache(), lru_handle); + *handle = CacheHandle(this, lru_handle); return true; } void ObjLRUCache::erase(const ObjKey& key) { if (_enabled) { - cache()->erase(key.key); + LRUCachePolicy::erase(key.key); } } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index f43e7a1e8d27e3..46832db9ad73a0 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -33,10 +33,24 @@ class ObjLRUCache : public LRUCachePolicy { std::string key; }; + template + class ObjValue : public LRUCacheValueBase { + public: + ObjValue(const T* value) + : LRUCacheValueBase(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE), value(value) {} + ~ObjValue() override { + T* v = (T*)value; + delete v; + } + + const T* value; + }; + class CacheHandle { public: CacheHandle() = default; - CacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} + CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) + : _cache(cache), _handle(handle) {} ~CacheHandle() { if (_handle != nullptr) { _cache->release(_handle); @@ -56,11 +70,14 @@ class ObjLRUCache : public LRUCachePolicy { bool valid() { return _cache != nullptr && _handle != nullptr; } - Cache* cache() const { return _cache; } - void* data() const { return _cache->value(_handle); } + LRUCachePolicy* cache() const { return _cache; } + template + void* data() const { + return (void*)((ObjValue*)_cache->value(_handle))->value; + } private: - Cache* _cache = nullptr; + LRUCachePolicy* _cache = nullptr; Cache::Handle* _handle = nullptr; // Don't allow copy and assign @@ -73,21 +90,12 @@ class ObjLRUCache : public LRUCachePolicy { template void insert(const ObjKey& key, const T* value, CacheHandle* cache_handle) { - auto deleter = [](const doris::CacheKey& key, void* value) { - T* v = (T*)value; - delete v; - }; - insert(key, value, cache_handle, deleter); - } - - template - void insert(const ObjKey& key, const T* value, CacheHandle* cache_handle, - void (*deleter)(const CacheKey& key, void* value)) { if (_enabled) { const std::string& encoded_key = key.key; - auto handle = cache()->insert(encoded_key, (void*)value, 1, deleter, - CachePriority::NORMAL, sizeof(T)); - *cache_handle = CacheHandle {cache(), handle}; + auto* obj_value = new ObjValue(value); + auto* handle = LRUCachePolicy::insert(encoded_key, obj_value, 1, sizeof(T), + CachePriority::NORMAL); + *cache_handle = CacheHandle {this, handle}; } else { cache_handle = nullptr; } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 250604e218f21a..759ccef1a7faec 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -263,7 +263,7 @@ Status ParquetReader::_open_file() { _column_statistics.meta_read_calls += 1; } - _file_metadata = (FileMetaData*)_meta_cache_handle.data(); + _file_metadata = (FileMetaData*)_meta_cache_handle.data(); } if (_file_metadata == nullptr) { diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index e8a8b81efe6129..0d1890fe4de783 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -26,6 +26,7 @@ #include "gtest/gtest_pred_impl.h" #include "runtime/memory/lru_cache_policy.h" +#include "runtime/memory/lru_cache_value_base.h" #include "runtime/memory/mem_tracker_limiter.h" #include "testutil/test_util.h" @@ -68,10 +69,26 @@ class CacheTest : public testing::Test { public: static CacheTest* _s_current; - static void Deleter(const CacheKey& key, void* v) { - _s_current->_deleted_keys.push_back(DecodeKey(key)); - _s_current->_deleted_values.push_back(DecodeValue(v)); - } + class CacheValueWithKey : public LRUCacheValueBase { + public: + CacheValueWithKey(int key, void* value) + : LRUCacheValueBase(CachePolicy::CacheType::FOR_UT), key(key), value(value) {} + ~CacheValueWithKey() override { + _s_current->_deleted_keys.push_back(key); + _s_current->_deleted_values.push_back(DecodeValue(value)); + } + + int key; + void* value; + }; + + class CacheValue : public LRUCacheValueBase { + public: + CacheValue(void* value) : LRUCacheValueBase(CachePolicy::CacheType::FOR_UT), value(value) {} + ~CacheValue() override = default; + + void* value; + }; class CacheTestPolicy : public LRUCachePolicy { public: @@ -92,12 +109,14 @@ class CacheTest : public testing::Test { ~CacheTest() override { delete _cache; } - Cache* cache() const { return _cache->cache(); } + LRUCachePolicy* cache() const { return _cache; } int Lookup(int key) const { std::string result; Cache::Handle* handle = cache()->lookup(EncodeKey(&result, key)); - const int r = (handle == nullptr) ? -1 : DecodeValue(cache()->value(handle)); + const int r = (handle == nullptr) + ? -1 + : DecodeValue(((CacheValueWithKey*)cache()->value(handle))->value); if (handle != nullptr) { cache()->release(handle); @@ -108,14 +127,17 @@ class CacheTest : public testing::Test { void Insert(int key, int value, int charge) const { std::string result; - cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter)); + CacheKey cache_key = EncodeKey(&result, key); + auto* cache_value = new CacheValueWithKey(DecodeKey(cache_key), EncodeValue(value)); + cache()->release(cache()->insert(cache_key, cache_value, charge, charge)); } void InsertDurable(int key, int value, int charge) const { std::string result; - cache()->release(cache()->insert(EncodeKey(&result, key), EncodeValue(value), charge, - &CacheTest::Deleter, CachePriority::DURABLE)); + CacheKey cache_key = EncodeKey(&result, key); + auto* cache_value = new CacheValueWithKey(DecodeKey(cache_key), EncodeValue(value)); + cache()->release( + cache()->insert(cache_key, cache_value, charge, charge, CachePriority::DURABLE)); } void Erase(int key) const { @@ -175,12 +197,12 @@ TEST_F(CacheTest, EntriesArePinned) { Insert(100, 101, 1); std::string result1; Cache::Handle* h1 = cache()->lookup(EncodeKey(&result1, 100)); - EXPECT_EQ(101, DecodeValue(cache()->value(h1))); + EXPECT_EQ(101, DecodeValue(((CacheValueWithKey*)cache()->value(h1))->value)); Insert(100, 102, 1); std::string result2; Cache::Handle* h2 = cache()->lookup(EncodeKey(&result2, 100)); - EXPECT_EQ(102, DecodeValue(cache()->value(h2))); + EXPECT_EQ(102, DecodeValue(((CacheValueWithKey*)cache()->value(h2))->value)); EXPECT_EQ(0, _deleted_keys.size()); cache()->release(h1); @@ -230,61 +252,53 @@ TEST_F(CacheTest, EvictionPolicyWithDurable) { EXPECT_EQ(201, Lookup(200)); } -static void deleter(const CacheKey& key, void* v) {} - static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value, CachePriority priority) { uint32_t hash = key.hash(key.data(), key.size(), 0); - static std::unique_ptr lru_cache_tracker = - std::make_unique(MemTrackerLimiter::Type::GLOBAL, - "TestSizeLruCache"); - cache.release(cache.insert(key, hash, EncodeValue(value), value, &deleter, - lru_cache_tracker.get(), priority, value)); + auto* cache_value = new CacheTest::CacheValue(EncodeValue(value)); + cache.release(cache.insert(key, hash, cache_value, value, priority)); } static void insert_number_LRUCache(LRUCache& cache, const CacheKey& key, int value, int charge, CachePriority priority) { uint32_t hash = key.hash(key.data(), key.size(), 0); - static std::unique_ptr lru_cache_tracker = - std::make_unique(MemTrackerLimiter::Type::GLOBAL, - "TestNumberLruCache"); - cache.release(cache.insert(key, hash, EncodeValue(value), charge, &deleter, - lru_cache_tracker.get(), priority, value)); + auto* cache_value = new CacheTest::CacheValue(EncodeValue(value)); + cache.release(cache.insert(key, hash, cache_value, charge, priority)); } TEST_F(CacheTest, Usage) { LRUCache cache(LRUCacheType::SIZE); - cache.set_capacity(1050); + cache.set_capacity(1040); // The lru usage is handle_size + charge. - // handle_size = sizeof(handle) - 1 + key size = 120 - 1 + 3 = 122 + // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98 CacheKey key1("100"); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); - ASSERT_EQ(222, cache.get_usage()); // 100 + 122 + ASSERT_EQ(198, cache.get_usage()); // 100 + 98 CacheKey key2("200"); insert_LRUCache(cache, key2, 200, CachePriority::DURABLE); - ASSERT_EQ(544, cache.get_usage()); // 222 + 322(d), d = DURABLE + ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE CacheKey key3("300"); insert_LRUCache(cache, key3, 300, CachePriority::NORMAL); - ASSERT_EQ(966, cache.get_usage()); // 222 + 322(d) + 422 + ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398 CacheKey key4("400"); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - ASSERT_EQ(844, cache.get_usage()); // 322(d) + 522, evict 222 422 + ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398 CacheKey key5("500"); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - ASSERT_EQ(944, cache.get_usage()); // 322(d) + 622, evict 522 + ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498 CacheKey key6("600"); insert_LRUCache(cache, key6, 600, CachePriority::NORMAL); - ASSERT_EQ(1044, cache.get_usage()); // 322(d) + 722, evict 622 + ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 598 CacheKey key7("950"); insert_LRUCache(cache, key7, 950, CachePriority::DURABLE); - ASSERT_EQ(0, cache.get_usage()); // evict 322 722, because 950 + 122 > 1050, so insert failed + ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, so insert failed } TEST_F(CacheTest, Prune) { @@ -325,7 +339,7 @@ TEST_F(CacheTest, Prune) { EXPECT_EQ(5, cache.get_usage()); auto pred2 = [](const LRUHandle* handle) -> bool { - return DecodeValue((void*)(handle->value)) > 400; + return DecodeValue((void*)(((CacheValue*)handle->value)->value)) > 400; }; cache.prune_if(pred2); EXPECT_EQ(2, cache.get_usage()); @@ -381,7 +395,7 @@ TEST_F(CacheTest, PruneIfLazyMode) { // in lazy mode, the first item not satisfied the pred2, `prune_if` then stopped // and no item's removed. auto pred2 = [](const LRUHandle* handle) -> bool { - return DecodeValue((void*)(handle->value)) > 400; + return DecodeValue((void*)(((CacheValue*)handle->value)->value)) > 400; }; cache.prune_if(pred2, true); EXPECT_EQ(7, cache.get_usage()); @@ -389,7 +403,7 @@ TEST_F(CacheTest, PruneIfLazyMode) { // in normal priority, 100, 300 are removed // in durable priority, 200 is removed auto pred3 = [](const LRUHandle* handle) -> bool { - return DecodeValue((void*)(handle->value)) <= 600; + return DecodeValue((void*)(((CacheValue*)handle->value)->value)) <= 600; }; PrunedInfo pruned_info = cache.prune_if(pred3, true); EXPECT_EQ(3, pruned_info.pruned_count); @@ -441,7 +455,7 @@ TEST_F(CacheTest, Number) { EXPECT_EQ(2, cache.get_usage()); auto pred2 = [](const LRUHandle* handle) -> bool { - return DecodeValue((void*)(handle->value)) > 100; + return DecodeValue((void*)(((CacheValue*)handle->value)->value)) > 100; }; cache.prune_if(pred2); EXPECT_EQ(1, cache.get_usage()); @@ -509,7 +523,6 @@ TEST(CacheHandleTest, HandleTableTest) { CacheKey* key = &keys[i]; auto* h = reinterpret_cast(malloc(sizeof(LRUHandle) - 1 + key->size())); h->value = nullptr; - h->deleter = nullptr; h->charge = 1; h->total_size = sizeof(LRUHandle) - 1 + key->size() + 1; h->key_length = key->size(); @@ -542,7 +555,6 @@ TEST(CacheHandleTest, HandleTableTest) { CacheKey* key = &keys[i]; auto* h = reinterpret_cast(malloc(sizeof(LRUHandle) - 1 + key->size())); h->value = nullptr; - h->deleter = nullptr; h->charge = 1; h->total_size = sizeof(LRUHandle) - 1 + key->size() + 1; h->key_length = key->size(); diff --git a/be/test/olap/page_cache_test.cpp b/be/test/olap/page_cache_test.cpp index 654d8a4341fce1..9dcc5dad485590 100644 --- a/be/test/olap/page_cache_test.cpp +++ b/be/test/olap/page_cache_test.cpp @@ -28,12 +28,17 @@ static int kNumShards = StoragePageCache::kDefaultNumShards; class StoragePageCacheTest : public testing::Test { public: - StoragePageCacheTest() {} - virtual ~StoragePageCacheTest() {} + StoragePageCacheTest() { + mem_tracker = std::make_shared(MemTrackerLimiter::Type::GLOBAL, + "StoragePageCacheTest"); + } + ~StoragePageCacheTest() override = default; + + std::shared_ptr mem_tracker; }; // All cache space is allocated to data pages -TEST(StoragePageCacheTest, data_page_only) { +TEST_F(StoragePageCacheTest, data_page_only) { StoragePageCache cache(kNumShards * 2048, 0, 0, kNumShards); StoragePageCache::CacheKey key("abc", 0, 0); @@ -44,7 +49,7 @@ TEST(StoragePageCacheTest, data_page_only) { { // insert normal page PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(key, data, &handle, page_type, false); EXPECT_EQ(handle.data().data, data->data()); @@ -57,7 +62,7 @@ TEST(StoragePageCacheTest, data_page_only) { { // insert in_memory page PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(memory_key, data, &handle, page_type, true); EXPECT_EQ(handle.data().data, data->data()); @@ -70,7 +75,7 @@ TEST(StoragePageCacheTest, data_page_only) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(key, data, &handle, page_type, false); } @@ -99,7 +104,7 @@ TEST(StoragePageCacheTest, data_page_only) { } // All cache space is allocated to index pages -TEST(StoragePageCacheTest, index_page_only) { +TEST_F(StoragePageCacheTest, index_page_only) { StoragePageCache cache(kNumShards * 2048, 100, 0, kNumShards); StoragePageCache::CacheKey key("abc", 0, 0); @@ -110,7 +115,7 @@ TEST(StoragePageCacheTest, index_page_only) { { // insert normal page PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(key, data, &handle, page_type, false); EXPECT_EQ(handle.data().data, data->data()); @@ -123,7 +128,7 @@ TEST(StoragePageCacheTest, index_page_only) { { // insert in_memory page PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(memory_key, data, &handle, page_type, true); EXPECT_EQ(handle.data().data, data->data()); @@ -136,7 +141,7 @@ TEST(StoragePageCacheTest, index_page_only) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - DataPage* data = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); cache.insert(key, data, &handle, page_type, false); } @@ -165,7 +170,7 @@ TEST(StoragePageCacheTest, index_page_only) { } // Cache space is allocated by index_page_cache_ratio -TEST(StoragePageCacheTest, mixed_pages) { +TEST_F(StoragePageCacheTest, mixed_pages) { StoragePageCache cache(kNumShards * 2048, 10, 0, kNumShards); StoragePageCache::CacheKey data_key("data", 0, 0); @@ -179,8 +184,8 @@ TEST(StoragePageCacheTest, mixed_pages) { { // insert both normal pages PageCacheHandle data_handle, index_handle; - DataPage* data = new DataPage(1024); - DataPage* index = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); + auto* index = new DataPage(1024, mem_tracker); cache.insert(data_key, data, &data_handle, page_type_data, false); cache.insert(index_key, index, &index_handle, page_type_index, false); @@ -198,8 +203,8 @@ TEST(StoragePageCacheTest, mixed_pages) { { // insert both in_memory pages PageCacheHandle data_handle, index_handle; - DataPage* data = new DataPage(1024); - DataPage* index = new DataPage(1024); + auto* data = new DataPage(1024, mem_tracker); + auto* index = new DataPage(1024, mem_tracker); cache.insert(data_key_mem, data, &data_handle, page_type_data, true); cache.insert(index_key_mem, index, &index_handle, page_type_index, true); @@ -216,8 +221,8 @@ TEST(StoragePageCacheTest, mixed_pages) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - std::unique_ptr data = std::make_unique(1024); - std::unique_ptr index = std::make_unique(1024); + std::unique_ptr data = std::make_unique(1024, mem_tracker); + std::unique_ptr index = std::make_unique(1024, mem_tracker); cache.insert(key, data.release(), &handle, page_type_data, false); cache.insert(key, index.release(), &handle, page_type_index, false); } @@ -237,8 +242,8 @@ TEST(StoragePageCacheTest, mixed_pages) { PageCacheHandle data_handle, index_handle; StoragePageCache::CacheKey miss_key_data("data_miss", 0, 1); StoragePageCache::CacheKey miss_key_index("index_miss", 0, 1); - std::unique_ptr data = std::make_unique(1024); - std::unique_ptr index = std::make_unique(1024); + std::unique_ptr data = std::make_unique(1024, mem_tracker); + std::unique_ptr index = std::make_unique(1024, mem_tracker); cache.insert(miss_key_data, data.release(), &data_handle, page_type_data, false); cache.insert(miss_key_index, index.release(), &index_handle, page_type_index, false);