Skip to content

Commit

Permalink
[enhancement](cloud) file cache evict in advance (#47473)
Browse files Browse the repository at this point in the history
evict in advance if current cache size is over threshold to avoid sync
evict during query, which may affect query performance.
  • Loading branch information
freemandealer authored Feb 7, 2025
1 parent 67d58f1 commit 20c2dd0
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 21 deletions.
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,12 @@ DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB

DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
// If true, evict the ttl cache using LRU when full.
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,11 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_evict_file_cache_in_advance);
DECLARE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent);
DECLARE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent);
DECLARE_mInt32(file_cache_evict_in_advance_interval_ms);
DECLARE_mInt64(file_cache_evict_in_advance_batch_bytes);
DECLARE_mBool(enable_read_cache_file_directly);
DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
// If true, evict the ttl cache using LRU when full.
Expand Down
146 changes: 130 additions & 16 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
"file_cache_hit_ratio_1h", 0.0);
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
_need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);

_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
Expand All @@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
_cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
_evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");

_recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_recycle_keys_length");

_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60 * 60);
Expand Down Expand Up @@ -339,6 +346,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
_cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this);
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
_cache_background_evict_in_advance_thread =
std::thread(&BlockFileCache::run_background_evict_in_advance, this);

return Status::OK();
}
Expand Down Expand Up @@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext&
return true;
}

void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
UInt128Wrapper hash = UInt128Wrapper();
size_t offset = 0;
CacheContext context;
context.cache_type = FileCacheType::NORMAL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false);
context.cache_type = FileCacheType::TTL;
try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false);
}

bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly,
std::lock_guard<std::mutex>& cache_lock, bool sync) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
Expand Down Expand Up @@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size

bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
Expand Down Expand Up @@ -1211,7 +1230,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
}
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
}
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);

return !is_overflow(removed_size, size, cur_cache_size);
}
Expand All @@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,

bool BlockFileCache::try_reserve_from_other_queue_by_size(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
Expand All @@ -1249,17 +1268,18 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
cur_removed_size);
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
}
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);
return !is_overflow(removed_size, size, cur_cache_size);
}

bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
int64_t cur_time,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock,
bool sync_removal) {
// currently, TTL cache is not considered as a candidate
auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
bool reserve_success = try_reserve_from_other_queue_by_time_interval(
cur_cache_type, other_cache_types, size, cur_time, cache_lock);
cur_cache_type, other_cache_types, size, cur_time, cache_lock, sync_removal);
if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
return reserve_success;
}
Expand All @@ -1272,14 +1292,15 @@ bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
return false;
}
return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size,
cache_lock);
return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
sync_removal);
}

bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock,
bool sync_removal) {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
Expand All @@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
cur_removed_size);
remove_file_blocks(to_evict, cache_lock, true);
remove_file_blocks(to_evict, cache_lock, sync_removal);
*(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;

if (is_overflow(removed_size, size, cur_cache_size)) {
Expand Down Expand Up @@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
// so there will be a window that the file is not in the cache but still in the storage
// but it's ok, because the rowset is stale already
bool ret = _recycle_keys.enqueue(key);
if (!ret) {
if (ret) [[likely]] {
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
} else {
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
int64_t duration_ns = 0;
Status st;
Expand Down Expand Up @@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent)
int inode_percentage = int(inode_free * 1.0 / inode_total * 100);
percent->first = capacity_percentage;
percent->second = 100 - inode_percentage;

// Add sync point for testing
TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);

return 0;
}

Expand Down Expand Up @@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
.tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
config::file_cache_enter_disk_resource_limit_mode_percent = 90;
config::file_cache_enter_disk_resource_limit_mode_percent = 88;
config::file_cache_exit_disk_resource_limit_mode_percent = 80;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)) {
Expand All @@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() {
}
}

void BlockFileCache::check_need_evict_cache_in_advance() {
if (_storage->get_type() != FileCacheStorageType::DISK) {
return;
}

std::pair<int, int> percent;
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [space_percentage, inode_percentage] = percent;
size_t size_percentage = static_cast<size_t>(
(static_cast<double>(_cur_cache_size) / static_cast<double>(_capacity)) * 100);
auto is_insufficient = [](const int& percentage) {
return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
};
DCHECK_GE(space_percentage, 0);
DCHECK_LE(space_percentage, 100);
DCHECK_GE(inode_percentage, 0);
DCHECK_LE(inode_percentage, 100);
// ATTN: due to that can be changed dynamically, set it to default value if it's invalid
// FIXME: reject with config validator
if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
config::file_cache_exit_need_evict_cache_in_advance_percent) {
LOG_WARNING("config error, set to default value")
.tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
.tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
}
if (is_insufficient(space_percentage) || is_insufficient(inode_percentage) ||
is_insufficient(size_percentage)) {
_need_evict_cache_in_advance = true;
_need_evict_cache_in_advance_metrics->set_value(1);
} else if (_need_evict_cache_in_advance &&
(space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
(size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
_need_evict_cache_in_advance = false;
_need_evict_cache_in_advance_metrics->set_value(0);
}
if (_need_evict_cache_in_advance) {
LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
<< " inode_percent=" << inode_percentage << " size_percent=" << size_percentage
<< " is_space_insufficient=" << is_insufficient(space_percentage)
<< " is_inode_insufficient=" << is_insufficient(inode_percentage)
<< " is_size_insufficient=" << is_insufficient(size_percentage)
<< " need evict cache in advance";
}
}

void BlockFileCache::run_background_monitor() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
check_disk_resource_limit();
if (config::enable_evict_file_cache_in_advance) {
check_need_evict_cache_in_advance();
} else {
_need_evict_cache_in_advance = false;
}

{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
Expand Down Expand Up @@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() {
break;
}
}
while (_recycle_keys.try_dequeue(key)) {
if (batch_count >= batch_limit) {
break;
}

while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
int64_t duration_ns = 0;
Status st;
{
Expand All @@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() {
}
batch_count++;
}
*_recycle_keys_length_recorder << _recycle_keys.size_approx();
batch_count = 0;
}
}

void BlockFileCache::run_background_evict_in_advance() {
LOG(INFO) << "Starting background evict in advance thread";
int64_t batch = 0;
while (!_close) {
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(
close_lock,
std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
if (_close) {
LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
break;
}
}
batch = config::file_cache_evict_in_advance_batch_bytes;

// Skip if eviction not needed or too many pending recycles
if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >= (batch * 10)) {
continue;
}

int64_t duration_ns = 0;
{
SCOPED_CACHE_LOCK(_mutex, this);
SCOPED_RAW_TIMER(&duration_ns);
try_evict_in_advance(batch, cache_lock);
}
*_evict_in_advance_latency_us << (duration_ns / 1000);
}
}

void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
SCOPED_CACHE_LOCK(_mutex, this);
Expand Down
Loading

0 comments on commit 20c2dd0

Please sign in to comment.